Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/guarded_eval.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

423 statements  

1from copy import copy 

2from inspect import isclass, signature, Signature, getmodule 

3from typing import ( 

4 Annotated, 

5 AnyStr, 

6 Callable, 

7 Literal, 

8 NamedTuple, 

9 NewType, 

10 Optional, 

11 Protocol, 

12 Sequence, 

13 TypeGuard, 

14 Union, 

15 get_args, 

16 get_origin, 

17 is_typeddict, 

18) 

19import ast 

20import builtins 

21import collections 

22import operator 

23import sys 

24from functools import cached_property 

25from dataclasses import dataclass, field 

26from types import MethodDescriptorType, ModuleType 

27 

28from IPython.utils.decorators import undoc 

29 

30 

31from typing import Self, LiteralString 

32 

33if sys.version_info < (3, 12): 

34 from typing_extensions import TypeAliasType 

35else: 

36 from typing import TypeAliasType 

37 

38 

39@undoc 

40class HasGetItem(Protocol): 

41 def __getitem__(self, key) -> None: ... 

42 

43 

44@undoc 

45class InstancesHaveGetItem(Protocol): 

46 def __call__(self, *args, **kwargs) -> HasGetItem: ... 

47 

48 

49@undoc 

50class HasGetAttr(Protocol): 

51 def __getattr__(self, key) -> None: ... 

52 

53 

54@undoc 

55class DoesNotHaveGetAttr(Protocol): 

56 pass 

57 

58 

59# By default `__getattr__` is not explicitly implemented on most objects 

60MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr] 

61 

62 

63def _unbind_method(func: Callable) -> Union[Callable, None]: 

64 """Get unbound method for given bound method. 

65 

66 Returns None if cannot get unbound method, or method is already unbound. 

67 """ 

68 owner = getattr(func, "__self__", None) 

69 owner_class = type(owner) 

70 name = getattr(func, "__name__", None) 

71 instance_dict_overrides = getattr(owner, "__dict__", None) 

72 if ( 

73 owner is not None 

74 and name 

75 and ( 

76 not instance_dict_overrides 

77 or (instance_dict_overrides and name not in instance_dict_overrides) 

78 ) 

79 ): 

80 return getattr(owner_class, name) 

81 return None 

82 

83 

84@undoc 

85@dataclass 

86class EvaluationPolicy: 

87 """Definition of evaluation policy.""" 

88 

89 allow_locals_access: bool = False 

90 allow_globals_access: bool = False 

91 allow_item_access: bool = False 

92 allow_attr_access: bool = False 

93 allow_builtins_access: bool = False 

94 allow_all_operations: bool = False 

95 allow_any_calls: bool = False 

96 allow_auto_import: bool = False 

97 allowed_calls: set[Callable] = field(default_factory=set) 

98 

99 def can_get_item(self, value, item): 

100 return self.allow_item_access 

101 

102 def can_get_attr(self, value, attr): 

103 return self.allow_attr_access 

104 

105 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

106 if self.allow_all_operations: 

107 return True 

108 

109 def can_call(self, func): 

110 if self.allow_any_calls: 

111 return True 

112 

113 if func in self.allowed_calls: 

114 return True 

115 

116 owner_method = _unbind_method(func) 

117 

118 if owner_method and owner_method in self.allowed_calls: 

119 return True 

120 

121 

122def _get_external(module_name: str, access_path: Sequence[str]): 

123 """Get value from external module given a dotted access path. 

124 

125 Only gets value if the module is already imported. 

126 

127 Raises: 

128 * `KeyError` if module is removed not found, and 

129 * `AttributeError` if access path does not match an exported object 

130 """ 

131 try: 

132 member_type = sys.modules[module_name] 

133 # standard module 

134 for attr in access_path: 

135 member_type = getattr(member_type, attr) 

136 return member_type 

137 except (KeyError, AttributeError): 

138 # handle modules in namespace packages 

139 module_path = ".".join([module_name, *access_path]) 

140 if module_path in sys.modules: 

141 return sys.modules[module_path] 

142 raise 

143 

144 

145def _has_original_dunder_external( 

146 value, 

147 module_name: str, 

148 access_path: Sequence[str], 

149 method_name: str, 

150): 

151 if module_name not in sys.modules: 

152 full_module_path = ".".join([module_name, *access_path]) 

153 if full_module_path not in sys.modules: 

154 # LBYLB as it is faster 

155 return False 

156 try: 

157 member_type = _get_external(module_name, access_path) 

158 value_type = type(value) 

159 if type(value) == member_type: 

160 return True 

161 if isinstance(member_type, ModuleType): 

162 value_module = getmodule(value_type) 

163 if not value_module or not value_module.__name__: 

164 return False 

165 if value_module.__name__.startswith(member_type.__name__): 

166 return True 

167 if method_name == "__getattribute__": 

168 # we have to short-circuit here due to an unresolved issue in 

169 # `isinstance` implementation: https://bugs.python.org/issue32683 

170 return False 

171 if not isinstance(member_type, ModuleType) and isinstance(value, member_type): 

172 method = getattr(value_type, method_name, None) 

173 member_method = getattr(member_type, method_name, None) 

174 if member_method == method: 

175 return True 

176 except (AttributeError, KeyError): 

177 return False 

178 

179 

180def _has_original_dunder( 

181 value, allowed_types, allowed_methods, allowed_external, method_name 

182): 

183 # note: Python ignores `__getattr__`/`__getitem__` on instances, 

184 # we only need to check at class level 

185 value_type = type(value) 

186 

187 # strict type check passes → no need to check method 

188 if value_type in allowed_types: 

189 return True 

190 

191 method = getattr(value_type, method_name, None) 

192 

193 if method is None: 

194 return None 

195 

196 if method in allowed_methods: 

197 return True 

198 

199 for module_name, *access_path in allowed_external: 

200 if _has_original_dunder_external(value, module_name, access_path, method_name): 

201 return True 

202 

203 return False 

204 

205 

206def _coerce_path_to_tuples( 

207 allow_list: set[tuple[str, ...] | str], 

208) -> set[tuple[str, ...]]: 

209 """Replace dotted paths on the provided allow-list with tuples.""" 

210 return { 

211 path if isinstance(path, tuple) else tuple(path.split(".")) 

212 for path in allow_list 

213 } 

214 

215 

216@undoc 

217@dataclass 

218class SelectivePolicy(EvaluationPolicy): 

219 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set) 

220 allowed_getitem_external: set[tuple[str, ...] | str] = field(default_factory=set) 

221 

222 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set) 

223 allowed_getattr_external: set[tuple[str, ...] | str] = field(default_factory=set) 

224 

225 allowed_operations: set = field(default_factory=set) 

226 allowed_operations_external: set[tuple[str, ...] | str] = field(default_factory=set) 

227 

228 _operation_methods_cache: dict[str, set[Callable]] = field( 

229 default_factory=dict, init=False 

230 ) 

231 

232 def can_get_attr(self, value, attr): 

233 allowed_getattr_external = _coerce_path_to_tuples(self.allowed_getattr_external) 

234 

235 has_original_attribute = _has_original_dunder( 

236 value, 

237 allowed_types=self.allowed_getattr, 

238 allowed_methods=self._getattribute_methods, 

239 allowed_external=allowed_getattr_external, 

240 method_name="__getattribute__", 

241 ) 

242 has_original_attr = _has_original_dunder( 

243 value, 

244 allowed_types=self.allowed_getattr, 

245 allowed_methods=self._getattr_methods, 

246 allowed_external=allowed_getattr_external, 

247 method_name="__getattr__", 

248 ) 

249 

250 accept = False 

251 

252 # Many objects do not have `__getattr__`, this is fine. 

253 if has_original_attr is None and has_original_attribute: 

254 accept = True 

255 else: 

256 # Accept objects without modifications to `__getattr__` and `__getattribute__` 

257 accept = has_original_attr and has_original_attribute 

258 

259 if accept: 

260 # We still need to check for overridden properties. 

261 

262 value_class = type(value) 

263 if not hasattr(value_class, attr): 

264 return True 

265 

266 class_attr_val = getattr(value_class, attr) 

267 is_property = isinstance(class_attr_val, property) 

268 

269 if not is_property: 

270 return True 

271 

272 # Properties in allowed types are ok (although we do not include any 

273 # properties in our default allow list currently). 

274 if type(value) in self.allowed_getattr: 

275 return True # pragma: no cover 

276 

277 # Properties in subclasses of allowed types may be ok if not changed 

278 for module_name, *access_path in allowed_getattr_external: 

279 try: 

280 external_class = _get_external(module_name, access_path) 

281 external_class_attr_val = getattr(external_class, attr) 

282 except (KeyError, AttributeError): 

283 return False # pragma: no cover 

284 return class_attr_val == external_class_attr_val 

285 

286 return False 

287 

288 def can_get_item(self, value, item): 

289 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified.""" 

290 allowed_getitem_external = _coerce_path_to_tuples(self.allowed_getitem_external) 

291 return _has_original_dunder( 

292 value, 

293 allowed_types=self.allowed_getitem, 

294 allowed_methods=self._getitem_methods, 

295 allowed_external=allowed_getitem_external, 

296 method_name="__getitem__", 

297 ) 

298 

299 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

300 allowed_operations_external = _coerce_path_to_tuples( 

301 self.allowed_operations_external 

302 ) 

303 objects = [a] 

304 if b is not None: 

305 objects.append(b) 

306 return all( 

307 [ 

308 _has_original_dunder( 

309 obj, 

310 allowed_types=self.allowed_operations, 

311 allowed_methods=self._operator_dunder_methods(dunder), 

312 allowed_external=allowed_operations_external, 

313 method_name=dunder, 

314 ) 

315 for dunder in dunders 

316 for obj in objects 

317 ] 

318 ) 

319 

320 def _operator_dunder_methods(self, dunder: str) -> set[Callable]: 

321 if dunder not in self._operation_methods_cache: 

322 self._operation_methods_cache[dunder] = self._safe_get_methods( 

323 self.allowed_operations, dunder 

324 ) 

325 return self._operation_methods_cache[dunder] 

326 

327 @cached_property 

328 def _getitem_methods(self) -> set[Callable]: 

329 return self._safe_get_methods(self.allowed_getitem, "__getitem__") 

330 

331 @cached_property 

332 def _getattr_methods(self) -> set[Callable]: 

333 return self._safe_get_methods(self.allowed_getattr, "__getattr__") 

334 

335 @cached_property 

336 def _getattribute_methods(self) -> set[Callable]: 

337 return self._safe_get_methods(self.allowed_getattr, "__getattribute__") 

338 

339 def _safe_get_methods(self, classes, name) -> set[Callable]: 

340 return { 

341 method 

342 for class_ in classes 

343 for method in [getattr(class_, name, None)] 

344 if method 

345 } 

346 

347 

348class _DummyNamedTuple(NamedTuple): 

349 """Used internally to retrieve methods of named tuple instance.""" 

350 

351 

352class EvaluationContext(NamedTuple): 

353 #: Local namespace 

354 locals: dict 

355 #: Global namespace 

356 globals: dict 

357 #: Evaluation policy identifier 

358 evaluation: Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"] = ( 

359 "forbidden" 

360 ) 

361 #: Whether the evaluation of code takes place inside of a subscript. 

362 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``. 

363 in_subscript: bool = False 

364 #: Auto import method 

365 auto_import: Callable[list[str], ModuleType] | None = None 

366 #: Overrides for evaluation policy 

367 policy_overrides: dict = {} 

368 

369 

370class _IdentitySubscript: 

371 """Returns the key itself when item is requested via subscript.""" 

372 

373 def __getitem__(self, key): 

374 return key 

375 

376 

377IDENTITY_SUBSCRIPT = _IdentitySubscript() 

378SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__" 

379UNKNOWN_SIGNATURE = Signature() 

380NOT_EVALUATED = object() 

381 

382 

383class GuardRejection(Exception): 

384 """Exception raised when guard rejects evaluation attempt.""" 

385 

386 pass 

387 

388 

389def guarded_eval(code: str, context: EvaluationContext): 

390 """Evaluate provided code in the evaluation context. 

391 

392 If evaluation policy given by context is set to ``forbidden`` 

393 no evaluation will be performed; if it is set to ``dangerous`` 

394 standard :func:`eval` will be used; finally, for any other, 

395 policy :func:`eval_node` will be called on parsed AST. 

396 """ 

397 locals_ = context.locals 

398 

399 if context.evaluation == "forbidden": 

400 raise GuardRejection("Forbidden mode") 

401 

402 # note: not using `ast.literal_eval` as it does not implement 

403 # getitem at all, for example it fails on simple `[0][1]` 

404 

405 if context.in_subscript: 

406 # syntactic sugar for ellipsis (:) is only available in subscripts 

407 # so we need to trick the ast parser into thinking that we have 

408 # a subscript, but we need to be able to later recognise that we did 

409 # it so we can ignore the actual __getitem__ operation 

410 if not code: 

411 return tuple() 

412 locals_ = locals_.copy() 

413 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT 

414 code = SUBSCRIPT_MARKER + "[" + code + "]" 

415 context = EvaluationContext(**{**context._asdict(), **{"locals": locals_}}) 

416 

417 if context.evaluation == "dangerous": 

418 return eval(code, context.globals, context.locals) 

419 

420 expression = ast.parse(code, mode="eval") 

421 

422 return eval_node(expression, context) 

423 

424 

425BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = { 

426 ast.Add: ("__add__",), 

427 ast.Sub: ("__sub__",), 

428 ast.Mult: ("__mul__",), 

429 ast.Div: ("__truediv__",), 

430 ast.FloorDiv: ("__floordiv__",), 

431 ast.Mod: ("__mod__",), 

432 ast.Pow: ("__pow__",), 

433 ast.LShift: ("__lshift__",), 

434 ast.RShift: ("__rshift__",), 

435 ast.BitOr: ("__or__",), 

436 ast.BitXor: ("__xor__",), 

437 ast.BitAnd: ("__and__",), 

438 ast.MatMult: ("__matmul__",), 

439} 

440 

441COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = { 

442 ast.Eq: ("__eq__",), 

443 ast.NotEq: ("__ne__", "__eq__"), 

444 ast.Lt: ("__lt__", "__gt__"), 

445 ast.LtE: ("__le__", "__ge__"), 

446 ast.Gt: ("__gt__", "__lt__"), 

447 ast.GtE: ("__ge__", "__le__"), 

448 ast.In: ("__contains__",), 

449 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially 

450} 

451 

452UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = { 

453 ast.USub: ("__neg__",), 

454 ast.UAdd: ("__pos__",), 

455 # we have to check both __inv__ and __invert__! 

456 ast.Invert: ("__invert__", "__inv__"), 

457 ast.Not: ("__not__",), 

458} 

459 

460 

461class ImpersonatingDuck: 

462 """A dummy class used to create objects of other classes without calling their ``__init__``""" 

463 

464 # no-op: override __class__ to impersonate 

465 

466 

467class _Duck: 

468 """A dummy class used to create objects pretending to have given attributes""" 

469 

470 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None): 

471 self.attributes = attributes or {} 

472 self.items = items or {} 

473 

474 def __getattr__(self, attr: str): 

475 return self.attributes[attr] 

476 

477 def __hasattr__(self, attr: str): 

478 return attr in self.attributes 

479 

480 def __dir__(self): 

481 return [*dir(super), *self.attributes] 

482 

483 def __getitem__(self, key: str): 

484 return self.items[key] 

485 

486 def __hasitem__(self, key: str): 

487 return self.items[key] 

488 

489 def _ipython_key_completions_(self): 

490 return self.items.keys() 

491 

492 

493def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]: 

494 dunder = None 

495 for op, candidate_dunder in dunders.items(): 

496 if isinstance(node_op, op): 

497 dunder = candidate_dunder 

498 return dunder 

499 

500 

501def get_policy(context: EvaluationContext) -> EvaluationPolicy: 

502 policy = copy(EVALUATION_POLICIES[context.evaluation]) 

503 

504 for key, value in context.policy_overrides.items(): 

505 if hasattr(policy, key): 

506 setattr(policy, key, value) 

507 else: 

508 print(f"Incorrect policy override key: {key}") 

509 return policy 

510 

511 

512def eval_node(node: Union[ast.AST, None], context: EvaluationContext): 

513 """Evaluate AST node in provided context. 

514 

515 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments. 

516 

517 Does not evaluate actions that always have side effects: 

518 

519 - class definitions (``class sth: ...``) 

520 - function definitions (``def sth: ...``) 

521 - variable assignments (``x = 1``) 

522 - augmented assignments (``x += 1``) 

523 - deletions (``del x``) 

524 

525 Does not evaluate operations which do not return values: 

526 

527 - assertions (``assert x``) 

528 - pass (``pass``) 

529 - imports (``import x``) 

530 - control flow: 

531 

532 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``) 

533 - loops (``for`` and ``while``) 

534 - exception handling 

535 

536 The purpose of this function is to guard against unwanted side-effects; 

537 it does not give guarantees on protection from malicious code execution. 

538 """ 

539 policy = get_policy(context) 

540 

541 if node is None: 

542 return None 

543 if isinstance(node, ast.Expression): 

544 return eval_node(node.body, context) 

545 if isinstance(node, ast.BinOp): 

546 left = eval_node(node.left, context) 

547 right = eval_node(node.right, context) 

548 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS) 

549 if dunders: 

550 if policy.can_operate(dunders, left, right): 

551 return getattr(left, dunders[0])(right) 

552 else: 

553 raise GuardRejection( 

554 f"Operation (`{dunders}`) for", 

555 type(left), 

556 f"not allowed in {context.evaluation} mode", 

557 ) 

558 if isinstance(node, ast.Compare): 

559 left = eval_node(node.left, context) 

560 all_true = True 

561 negate = False 

562 for op, right in zip(node.ops, node.comparators): 

563 right = eval_node(right, context) 

564 dunder = None 

565 dunders = _find_dunder(op, COMP_OP_DUNDERS) 

566 if not dunders: 

567 if isinstance(op, ast.NotIn): 

568 dunders = COMP_OP_DUNDERS[ast.In] 

569 negate = True 

570 if isinstance(op, ast.Is): 

571 dunder = "is_" 

572 if isinstance(op, ast.IsNot): 

573 dunder = "is_" 

574 negate = True 

575 if not dunder and dunders: 

576 dunder = dunders[0] 

577 if dunder: 

578 a, b = (right, left) if dunder == "__contains__" else (left, right) 

579 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b): 

580 result = getattr(operator, dunder)(a, b) 

581 if negate: 

582 result = not result 

583 if not result: 

584 all_true = False 

585 left = right 

586 else: 

587 raise GuardRejection( 

588 f"Comparison (`{dunder}`) for", 

589 type(left), 

590 f"not allowed in {context.evaluation} mode", 

591 ) 

592 else: 

593 raise ValueError( 

594 f"Comparison `{dunder}` not supported" 

595 ) # pragma: no cover 

596 return all_true 

597 if isinstance(node, ast.Constant): 

598 return node.value 

599 if isinstance(node, ast.Tuple): 

600 return tuple(eval_node(e, context) for e in node.elts) 

601 if isinstance(node, ast.List): 

602 return [eval_node(e, context) for e in node.elts] 

603 if isinstance(node, ast.Set): 

604 return {eval_node(e, context) for e in node.elts} 

605 if isinstance(node, ast.Dict): 

606 return dict( 

607 zip( 

608 [eval_node(k, context) for k in node.keys], 

609 [eval_node(v, context) for v in node.values], 

610 ) 

611 ) 

612 if isinstance(node, ast.Slice): 

613 return slice( 

614 eval_node(node.lower, context), 

615 eval_node(node.upper, context), 

616 eval_node(node.step, context), 

617 ) 

618 if isinstance(node, ast.UnaryOp): 

619 value = eval_node(node.operand, context) 

620 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS) 

621 if dunders: 

622 if policy.can_operate(dunders, value): 

623 try: 

624 return getattr(value, dunders[0])() 

625 except AttributeError: 

626 raise TypeError( 

627 f"bad operand type for unary {node.op}: {type(value)}" 

628 ) 

629 else: 

630 raise GuardRejection( 

631 f"Operation (`{dunders}`) for", 

632 type(value), 

633 f"not allowed in {context.evaluation} mode", 

634 ) 

635 if isinstance(node, ast.Subscript): 

636 value = eval_node(node.value, context) 

637 slice_ = eval_node(node.slice, context) 

638 if policy.can_get_item(value, slice_): 

639 return value[slice_] 

640 raise GuardRejection( 

641 "Subscript access (`__getitem__`) for", 

642 type(value), # not joined to avoid calling `repr` 

643 f" not allowed in {context.evaluation} mode", 

644 ) 

645 if isinstance(node, ast.Name): 

646 return _eval_node_name(node.id, context) 

647 if isinstance(node, ast.Attribute): 

648 value = eval_node(node.value, context) 

649 if policy.can_get_attr(value, node.attr): 

650 return getattr(value, node.attr) 

651 raise GuardRejection( 

652 "Attribute access (`__getattr__`) for", 

653 type(value), # not joined to avoid calling `repr` 

654 f"not allowed in {context.evaluation} mode", 

655 ) 

656 if isinstance(node, ast.IfExp): 

657 test = eval_node(node.test, context) 

658 if test: 

659 return eval_node(node.body, context) 

660 else: 

661 return eval_node(node.orelse, context) 

662 if isinstance(node, ast.Call): 

663 func = eval_node(node.func, context) 

664 if policy.can_call(func) and not node.keywords: 

665 args = [eval_node(arg, context) for arg in node.args] 

666 return func(*args) 

667 if isclass(func): 

668 # this code path gets entered when calling class e.g. `MyClass()` 

669 # or `my_instance.__class__()` - in both cases `func` is `MyClass`. 

670 # Should return `MyClass` if `__new__` is not overridden, 

671 # otherwise whatever `__new__` return type is. 

672 overridden_return_type = _eval_return_type(func.__new__, node, context) 

673 if overridden_return_type is not NOT_EVALUATED: 

674 return overridden_return_type 

675 return _create_duck_for_heap_type(func) 

676 else: 

677 return_type = _eval_return_type(func, node, context) 

678 if return_type is not NOT_EVALUATED: 

679 return return_type 

680 raise GuardRejection( 

681 "Call for", 

682 func, # not joined to avoid calling `repr` 

683 f"not allowed in {context.evaluation} mode", 

684 ) 

685 raise ValueError("Unhandled node", ast.dump(node)) 

686 

687 

688def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext): 

689 """Evaluate return type of a given callable function. 

690 

691 Returns the built-in type, a duck or NOT_EVALUATED sentinel. 

692 """ 

693 try: 

694 sig = signature(func) 

695 except ValueError: 

696 sig = UNKNOWN_SIGNATURE 

697 # if annotation was not stringized, or it was stringized 

698 # but resolved by signature call we know the return type 

699 not_empty = sig.return_annotation is not Signature.empty 

700 if not_empty: 

701 return _resolve_annotation(sig.return_annotation, sig, func, node, context) 

702 return NOT_EVALUATED 

703 

704 

705def _resolve_annotation( 

706 annotation, 

707 sig: Signature, 

708 func: Callable, 

709 node: ast.Call, 

710 context: EvaluationContext, 

711): 

712 """Resolve annotation created by user with `typing` module and custom objects.""" 

713 annotation = ( 

714 _eval_node_name(annotation, context) 

715 if isinstance(annotation, str) 

716 else annotation 

717 ) 

718 origin = get_origin(annotation) 

719 if annotation is Self and hasattr(func, "__self__"): 

720 return func.__self__ 

721 elif origin is Literal: 

722 type_args = get_args(annotation) 

723 if len(type_args) == 1: 

724 return type_args[0] 

725 elif annotation is LiteralString: 

726 return "" 

727 elif annotation is AnyStr: 

728 index = None 

729 for i, (key, value) in enumerate(sig.parameters.items()): 

730 if value.annotation is AnyStr: 

731 index = i 

732 break 

733 if index is not None and index < len(node.args): 

734 return eval_node(node.args[index], context) 

735 elif origin is TypeGuard: 

736 return False 

737 elif origin is Union: 

738 attributes = [ 

739 attr 

740 for type_arg in get_args(annotation) 

741 for attr in dir(_resolve_annotation(type_arg, sig, func, node, context)) 

742 ] 

743 return _Duck(attributes=dict.fromkeys(attributes)) 

744 elif is_typeddict(annotation): 

745 return _Duck( 

746 attributes=dict.fromkeys(dir(dict())), 

747 items={ 

748 k: _resolve_annotation(v, sig, func, node, context) 

749 for k, v in annotation.__annotations__.items() 

750 }, 

751 ) 

752 elif hasattr(annotation, "_is_protocol"): 

753 return _Duck(attributes=dict.fromkeys(dir(annotation))) 

754 elif origin is Annotated: 

755 type_arg = get_args(annotation)[0] 

756 return _resolve_annotation(type_arg, sig, func, node, context) 

757 elif isinstance(annotation, NewType): 

758 return _eval_or_create_duck(annotation.__supertype__, node, context) 

759 elif isinstance(annotation, TypeAliasType): 

760 return _eval_or_create_duck(annotation.__value__, node, context) 

761 else: 

762 return _eval_or_create_duck(annotation, node, context) 

763 

764 

765def _eval_node_name(node_id: str, context: EvaluationContext): 

766 policy = get_policy(context) 

767 if policy.allow_locals_access and node_id in context.locals: 

768 return context.locals[node_id] 

769 if policy.allow_globals_access and node_id in context.globals: 

770 return context.globals[node_id] 

771 if policy.allow_builtins_access and hasattr(builtins, node_id): 

772 # note: do not use __builtins__, it is implementation detail of cPython 

773 return getattr(builtins, node_id) 

774 if policy.allow_auto_import and context.auto_import: 

775 return context.auto_import(node_id) 

776 if not policy.allow_globals_access and not policy.allow_locals_access: 

777 raise GuardRejection( 

778 f"Namespace access not allowed in {context.evaluation} mode" 

779 ) 

780 else: 

781 raise NameError(f"{node_id} not found in locals, globals, nor builtins") 

782 

783 

784def _eval_or_create_duck(duck_type, node: ast.Call, context: EvaluationContext): 

785 policy = get_policy(context) 

786 # if allow-listed builtin is on type annotation, instantiate it 

787 if policy.can_call(duck_type) and not node.keywords: 

788 args = [eval_node(arg, context) for arg in node.args] 

789 return duck_type(*args) 

790 # if custom class is in type annotation, mock it 

791 return _create_duck_for_heap_type(duck_type) 

792 

793 

794def _create_duck_for_heap_type(duck_type): 

795 """Create an imitation of an object of a given type (a duck). 

796 

797 Returns the duck or NOT_EVALUATED sentinel if duck could not be created. 

798 """ 

799 duck = ImpersonatingDuck() 

800 try: 

801 # this only works for heap types, not builtins 

802 duck.__class__ = duck_type 

803 return duck 

804 except TypeError: 

805 pass 

806 return NOT_EVALUATED 

807 

808 

809SUPPORTED_EXTERNAL_GETITEM = { 

810 ("pandas", "core", "indexing", "_iLocIndexer"), 

811 ("pandas", "core", "indexing", "_LocIndexer"), 

812 ("pandas", "DataFrame"), 

813 ("pandas", "Series"), 

814 ("numpy", "ndarray"), 

815 ("numpy", "void"), 

816} 

817 

818 

819BUILTIN_GETITEM: set[InstancesHaveGetItem] = { 

820 dict, 

821 str, # type: ignore[arg-type] 

822 bytes, # type: ignore[arg-type] 

823 list, 

824 tuple, 

825 collections.defaultdict, 

826 collections.deque, 

827 collections.OrderedDict, 

828 collections.ChainMap, 

829 collections.UserDict, 

830 collections.UserList, 

831 collections.UserString, # type: ignore[arg-type] 

832 _DummyNamedTuple, 

833 _IdentitySubscript, 

834} 

835 

836 

837def _list_methods(cls, source=None): 

838 """For use on immutable objects or with methods returning a copy""" 

839 return [getattr(cls, k) for k in (source if source else dir(cls))] 

840 

841 

842dict_non_mutating_methods = ("copy", "keys", "values", "items") 

843list_non_mutating_methods = ("copy", "index", "count") 

844set_non_mutating_methods = set(dir(set)) & set(dir(frozenset)) 

845 

846 

847dict_keys: type[collections.abc.KeysView] = type({}.keys()) 

848 

849NUMERICS = {int, float, complex} 

850 

851ALLOWED_CALLS = { 

852 bytes, 

853 *_list_methods(bytes), 

854 dict, 

855 *_list_methods(dict, dict_non_mutating_methods), 

856 dict_keys.isdisjoint, 

857 list, 

858 *_list_methods(list, list_non_mutating_methods), 

859 set, 

860 *_list_methods(set, set_non_mutating_methods), 

861 frozenset, 

862 *_list_methods(frozenset), 

863 range, 

864 str, 

865 *_list_methods(str), 

866 tuple, 

867 *_list_methods(tuple), 

868 *NUMERICS, 

869 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)], 

870 collections.deque, 

871 *_list_methods(collections.deque, list_non_mutating_methods), 

872 collections.defaultdict, 

873 *_list_methods(collections.defaultdict, dict_non_mutating_methods), 

874 collections.OrderedDict, 

875 *_list_methods(collections.OrderedDict, dict_non_mutating_methods), 

876 collections.UserDict, 

877 *_list_methods(collections.UserDict, dict_non_mutating_methods), 

878 collections.UserList, 

879 *_list_methods(collections.UserList, list_non_mutating_methods), 

880 collections.UserString, 

881 *_list_methods(collections.UserString, dir(str)), 

882 collections.Counter, 

883 *_list_methods(collections.Counter, dict_non_mutating_methods), 

884 collections.Counter.elements, 

885 collections.Counter.most_common, 

886} 

887 

888BUILTIN_GETATTR: set[MayHaveGetattr] = { 

889 *BUILTIN_GETITEM, 

890 set, 

891 frozenset, 

892 object, 

893 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`. 

894 *NUMERICS, 

895 dict_keys, 

896 MethodDescriptorType, 

897 ModuleType, 

898} 

899 

900 

901BUILTIN_OPERATIONS = {*BUILTIN_GETATTR} 

902 

903EVALUATION_POLICIES = { 

904 "minimal": EvaluationPolicy( 

905 allow_builtins_access=True, 

906 allow_locals_access=False, 

907 allow_globals_access=False, 

908 allow_item_access=False, 

909 allow_attr_access=False, 

910 allowed_calls=set(), 

911 allow_any_calls=False, 

912 allow_all_operations=False, 

913 ), 

914 "limited": SelectivePolicy( 

915 allowed_getitem=BUILTIN_GETITEM, 

916 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM, 

917 allowed_getattr=BUILTIN_GETATTR, 

918 allowed_getattr_external={ 

919 # pandas Series/Frame implements custom `__getattr__` 

920 ("pandas", "DataFrame"), 

921 ("pandas", "Series"), 

922 }, 

923 allowed_operations=BUILTIN_OPERATIONS, 

924 allow_builtins_access=True, 

925 allow_locals_access=True, 

926 allow_globals_access=True, 

927 allowed_calls=ALLOWED_CALLS, 

928 ), 

929 "unsafe": EvaluationPolicy( 

930 allow_builtins_access=True, 

931 allow_locals_access=True, 

932 allow_globals_access=True, 

933 allow_attr_access=True, 

934 allow_item_access=True, 

935 allow_any_calls=True, 

936 allow_all_operations=True, 

937 ), 

938} 

939 

940 

941__all__ = [ 

942 "guarded_eval", 

943 "eval_node", 

944 "GuardRejection", 

945 "EvaluationContext", 

946 "_unbind_method", 

947]