Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/guarded_eval.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

401 statements  

1from copy import copy 

2from inspect import isclass, signature, Signature 

3from typing import ( 

4 Annotated, 

5 AnyStr, 

6 Callable, 

7 Literal, 

8 NamedTuple, 

9 NewType, 

10 Optional, 

11 Protocol, 

12 Sequence, 

13 TypeGuard, 

14 Union, 

15 get_args, 

16 get_origin, 

17 is_typeddict, 

18) 

19import ast 

20import builtins 

21import collections 

22import operator 

23import sys 

24from functools import cached_property 

25from dataclasses import dataclass, field 

26from types import MethodDescriptorType, ModuleType 

27 

28from IPython.utils.decorators import undoc 

29 

30 

31from typing import Self, LiteralString 

32 

33if sys.version_info < (3, 12): 

34 from typing_extensions import TypeAliasType 

35else: 

36 from typing import TypeAliasType 

37 

38 

39@undoc 

40class HasGetItem(Protocol): 

41 def __getitem__(self, key) -> None: ... 

42 

43 

44@undoc 

45class InstancesHaveGetItem(Protocol): 

46 def __call__(self, *args, **kwargs) -> HasGetItem: ... 

47 

48 

49@undoc 

50class HasGetAttr(Protocol): 

51 def __getattr__(self, key) -> None: ... 

52 

53 

54@undoc 

55class DoesNotHaveGetAttr(Protocol): 

56 pass 

57 

58 

59# By default `__getattr__` is not explicitly implemented on most objects 

60MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr] 

61 

62 

63def _unbind_method(func: Callable) -> Union[Callable, None]: 

64 """Get unbound method for given bound method. 

65 

66 Returns None if cannot get unbound method, or method is already unbound. 

67 """ 

68 owner = getattr(func, "__self__", None) 

69 owner_class = type(owner) 

70 name = getattr(func, "__name__", None) 

71 instance_dict_overrides = getattr(owner, "__dict__", None) 

72 if ( 

73 owner is not None 

74 and name 

75 and ( 

76 not instance_dict_overrides 

77 or (instance_dict_overrides and name not in instance_dict_overrides) 

78 ) 

79 ): 

80 return getattr(owner_class, name) 

81 return None 

82 

83 

84@undoc 

85@dataclass 

86class EvaluationPolicy: 

87 """Definition of evaluation policy.""" 

88 

89 allow_locals_access: bool = False 

90 allow_globals_access: bool = False 

91 allow_item_access: bool = False 

92 allow_attr_access: bool = False 

93 allow_builtins_access: bool = False 

94 allow_all_operations: bool = False 

95 allow_any_calls: bool = False 

96 allow_auto_import: bool = False 

97 allowed_calls: set[Callable] = field(default_factory=set) 

98 

99 def can_get_item(self, value, item): 

100 return self.allow_item_access 

101 

102 def can_get_attr(self, value, attr): 

103 return self.allow_attr_access 

104 

105 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

106 if self.allow_all_operations: 

107 return True 

108 

109 def can_call(self, func): 

110 if self.allow_any_calls: 

111 return True 

112 

113 if func in self.allowed_calls: 

114 return True 

115 

116 owner_method = _unbind_method(func) 

117 

118 if owner_method and owner_method in self.allowed_calls: 

119 return True 

120 

121 

122def _get_external(module_name: str, access_path: Sequence[str]): 

123 """Get value from external module given a dotted access path. 

124 

125 Raises: 

126 * `KeyError` if module is removed not found, and 

127 * `AttributeError` if access path does not match an exported object 

128 """ 

129 member_type = sys.modules[module_name] 

130 for attr in access_path: 

131 member_type = getattr(member_type, attr) 

132 return member_type 

133 

134 

135def _has_original_dunder_external( 

136 value, 

137 module_name: str, 

138 access_path: Sequence[str], 

139 method_name: str, 

140): 

141 if module_name not in sys.modules: 

142 # LBYLB as it is faster 

143 return False 

144 try: 

145 member_type = _get_external(module_name, access_path) 

146 value_type = type(value) 

147 if type(value) == member_type: 

148 return True 

149 if method_name == "__getattribute__": 

150 # we have to short-circuit here due to an unresolved issue in 

151 # `isinstance` implementation: https://bugs.python.org/issue32683 

152 return False 

153 if isinstance(value, member_type): 

154 method = getattr(value_type, method_name, None) 

155 member_method = getattr(member_type, method_name, None) 

156 if member_method == method: 

157 return True 

158 except (AttributeError, KeyError): 

159 return False 

160 

161 

162def _has_original_dunder( 

163 value, allowed_types, allowed_methods, allowed_external, method_name 

164): 

165 # note: Python ignores `__getattr__`/`__getitem__` on instances, 

166 # we only need to check at class level 

167 value_type = type(value) 

168 

169 # strict type check passes → no need to check method 

170 if value_type in allowed_types: 

171 return True 

172 

173 method = getattr(value_type, method_name, None) 

174 

175 if method is None: 

176 return None 

177 

178 if method in allowed_methods: 

179 return True 

180 

181 for module_name, *access_path in allowed_external: 

182 if _has_original_dunder_external(value, module_name, access_path, method_name): 

183 return True 

184 

185 return False 

186 

187 

188@undoc 

189@dataclass 

190class SelectivePolicy(EvaluationPolicy): 

191 allowed_getitem: set[InstancesHaveGetItem] = field(default_factory=set) 

192 allowed_getitem_external: set[tuple[str, ...]] = field(default_factory=set) 

193 

194 allowed_getattr: set[MayHaveGetattr] = field(default_factory=set) 

195 allowed_getattr_external: set[tuple[str, ...]] = field(default_factory=set) 

196 

197 allowed_operations: set = field(default_factory=set) 

198 allowed_operations_external: set[tuple[str, ...]] = field(default_factory=set) 

199 

200 _operation_methods_cache: dict[str, set[Callable]] = field( 

201 default_factory=dict, init=False 

202 ) 

203 

204 def can_get_attr(self, value, attr): 

205 has_original_attribute = _has_original_dunder( 

206 value, 

207 allowed_types=self.allowed_getattr, 

208 allowed_methods=self._getattribute_methods, 

209 allowed_external=self.allowed_getattr_external, 

210 method_name="__getattribute__", 

211 ) 

212 has_original_attr = _has_original_dunder( 

213 value, 

214 allowed_types=self.allowed_getattr, 

215 allowed_methods=self._getattr_methods, 

216 allowed_external=self.allowed_getattr_external, 

217 method_name="__getattr__", 

218 ) 

219 

220 accept = False 

221 

222 # Many objects do not have `__getattr__`, this is fine. 

223 if has_original_attr is None and has_original_attribute: 

224 accept = True 

225 else: 

226 # Accept objects without modifications to `__getattr__` and `__getattribute__` 

227 accept = has_original_attr and has_original_attribute 

228 

229 if accept: 

230 # We still need to check for overridden properties. 

231 

232 value_class = type(value) 

233 if not hasattr(value_class, attr): 

234 return True 

235 

236 class_attr_val = getattr(value_class, attr) 

237 is_property = isinstance(class_attr_val, property) 

238 

239 if not is_property: 

240 return True 

241 

242 # Properties in allowed types are ok (although we do not include any 

243 # properties in our default allow list currently). 

244 if type(value) in self.allowed_getattr: 

245 return True # pragma: no cover 

246 

247 # Properties in subclasses of allowed types may be ok if not changed 

248 for module_name, *access_path in self.allowed_getattr_external: 

249 try: 

250 external_class = _get_external(module_name, access_path) 

251 external_class_attr_val = getattr(external_class, attr) 

252 except (KeyError, AttributeError): 

253 return False # pragma: no cover 

254 return class_attr_val == external_class_attr_val 

255 

256 return False 

257 

258 def can_get_item(self, value, item): 

259 """Allow accessing `__getiitem__` of allow-listed instances unless it was not modified.""" 

260 return _has_original_dunder( 

261 value, 

262 allowed_types=self.allowed_getitem, 

263 allowed_methods=self._getitem_methods, 

264 allowed_external=self.allowed_getitem_external, 

265 method_name="__getitem__", 

266 ) 

267 

268 def can_operate(self, dunders: tuple[str, ...], a, b=None): 

269 objects = [a] 

270 if b is not None: 

271 objects.append(b) 

272 return all( 

273 [ 

274 _has_original_dunder( 

275 obj, 

276 allowed_types=self.allowed_operations, 

277 allowed_methods=self._operator_dunder_methods(dunder), 

278 allowed_external=self.allowed_operations_external, 

279 method_name=dunder, 

280 ) 

281 for dunder in dunders 

282 for obj in objects 

283 ] 

284 ) 

285 

286 def _operator_dunder_methods(self, dunder: str) -> set[Callable]: 

287 if dunder not in self._operation_methods_cache: 

288 self._operation_methods_cache[dunder] = self._safe_get_methods( 

289 self.allowed_operations, dunder 

290 ) 

291 return self._operation_methods_cache[dunder] 

292 

293 @cached_property 

294 def _getitem_methods(self) -> set[Callable]: 

295 return self._safe_get_methods(self.allowed_getitem, "__getitem__") 

296 

297 @cached_property 

298 def _getattr_methods(self) -> set[Callable]: 

299 return self._safe_get_methods(self.allowed_getattr, "__getattr__") 

300 

301 @cached_property 

302 def _getattribute_methods(self) -> set[Callable]: 

303 return self._safe_get_methods(self.allowed_getattr, "__getattribute__") 

304 

305 def _safe_get_methods(self, classes, name) -> set[Callable]: 

306 return { 

307 method 

308 for class_ in classes 

309 for method in [getattr(class_, name, None)] 

310 if method 

311 } 

312 

313 

314class _DummyNamedTuple(NamedTuple): 

315 """Used internally to retrieve methods of named tuple instance.""" 

316 

317 

318class EvaluationContext(NamedTuple): 

319 #: Local namespace 

320 locals: dict 

321 #: Global namespace 

322 globals: dict 

323 #: Evaluation policy identifier 

324 evaluation: Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"] = ( 

325 "forbidden" 

326 ) 

327 #: Whether the evaluation of code takes place inside of a subscript. 

328 #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``. 

329 in_subscript: bool = False 

330 #: Auto import method 

331 auto_import: Callable[list[str], ModuleType] | None = None 

332 #: Overrides for evaluation policy 

333 policy_overrides: dict = {} 

334 

335 

336class _IdentitySubscript: 

337 """Returns the key itself when item is requested via subscript.""" 

338 

339 def __getitem__(self, key): 

340 return key 

341 

342 

343IDENTITY_SUBSCRIPT = _IdentitySubscript() 

344SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__" 

345UNKNOWN_SIGNATURE = Signature() 

346NOT_EVALUATED = object() 

347 

348 

349class GuardRejection(Exception): 

350 """Exception raised when guard rejects evaluation attempt.""" 

351 

352 pass 

353 

354 

355def guarded_eval(code: str, context: EvaluationContext): 

356 """Evaluate provided code in the evaluation context. 

357 

358 If evaluation policy given by context is set to ``forbidden`` 

359 no evaluation will be performed; if it is set to ``dangerous`` 

360 standard :func:`eval` will be used; finally, for any other, 

361 policy :func:`eval_node` will be called on parsed AST. 

362 """ 

363 locals_ = context.locals 

364 

365 if context.evaluation == "forbidden": 

366 raise GuardRejection("Forbidden mode") 

367 

368 # note: not using `ast.literal_eval` as it does not implement 

369 # getitem at all, for example it fails on simple `[0][1]` 

370 

371 if context.in_subscript: 

372 # syntactic sugar for ellipsis (:) is only available in subscripts 

373 # so we need to trick the ast parser into thinking that we have 

374 # a subscript, but we need to be able to later recognise that we did 

375 # it so we can ignore the actual __getitem__ operation 

376 if not code: 

377 return tuple() 

378 locals_ = locals_.copy() 

379 locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT 

380 code = SUBSCRIPT_MARKER + "[" + code + "]" 

381 context = EvaluationContext(**{**context._asdict(), **{"locals": locals_}}) 

382 

383 if context.evaluation == "dangerous": 

384 return eval(code, context.globals, context.locals) 

385 

386 expression = ast.parse(code, mode="eval") 

387 

388 return eval_node(expression, context) 

389 

390 

391BINARY_OP_DUNDERS: dict[type[ast.operator], tuple[str]] = { 

392 ast.Add: ("__add__",), 

393 ast.Sub: ("__sub__",), 

394 ast.Mult: ("__mul__",), 

395 ast.Div: ("__truediv__",), 

396 ast.FloorDiv: ("__floordiv__",), 

397 ast.Mod: ("__mod__",), 

398 ast.Pow: ("__pow__",), 

399 ast.LShift: ("__lshift__",), 

400 ast.RShift: ("__rshift__",), 

401 ast.BitOr: ("__or__",), 

402 ast.BitXor: ("__xor__",), 

403 ast.BitAnd: ("__and__",), 

404 ast.MatMult: ("__matmul__",), 

405} 

406 

407COMP_OP_DUNDERS: dict[type[ast.cmpop], tuple[str, ...]] = { 

408 ast.Eq: ("__eq__",), 

409 ast.NotEq: ("__ne__", "__eq__"), 

410 ast.Lt: ("__lt__", "__gt__"), 

411 ast.LtE: ("__le__", "__ge__"), 

412 ast.Gt: ("__gt__", "__lt__"), 

413 ast.GtE: ("__ge__", "__le__"), 

414 ast.In: ("__contains__",), 

415 # Note: ast.Is, ast.IsNot, ast.NotIn are handled specially 

416} 

417 

418UNARY_OP_DUNDERS: dict[type[ast.unaryop], tuple[str, ...]] = { 

419 ast.USub: ("__neg__",), 

420 ast.UAdd: ("__pos__",), 

421 # we have to check both __inv__ and __invert__! 

422 ast.Invert: ("__invert__", "__inv__"), 

423 ast.Not: ("__not__",), 

424} 

425 

426 

427class ImpersonatingDuck: 

428 """A dummy class used to create objects of other classes without calling their ``__init__``""" 

429 

430 # no-op: override __class__ to impersonate 

431 

432 

433class _Duck: 

434 """A dummy class used to create objects pretending to have given attributes""" 

435 

436 def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None): 

437 self.attributes = attributes or {} 

438 self.items = items or {} 

439 

440 def __getattr__(self, attr: str): 

441 return self.attributes[attr] 

442 

443 def __hasattr__(self, attr: str): 

444 return attr in self.attributes 

445 

446 def __dir__(self): 

447 return [*dir(super), *self.attributes] 

448 

449 def __getitem__(self, key: str): 

450 return self.items[key] 

451 

452 def __hasitem__(self, key: str): 

453 return self.items[key] 

454 

455 def _ipython_key_completions_(self): 

456 return self.items.keys() 

457 

458 

459def _find_dunder(node_op, dunders) -> Union[tuple[str, ...], None]: 

460 dunder = None 

461 for op, candidate_dunder in dunders.items(): 

462 if isinstance(node_op, op): 

463 dunder = candidate_dunder 

464 return dunder 

465 

466 

467def get_policy(context: EvaluationContext) -> EvaluationPolicy: 

468 policy = copy(EVALUATION_POLICIES[context.evaluation]) 

469 

470 for key, value in context.policy_overrides.items(): 

471 if hasattr(policy, key): 

472 setattr(policy, key, value) 

473 else: 

474 print(f"Incorrect policy override key: {key}") 

475 return policy 

476 

477 

478def eval_node(node: Union[ast.AST, None], context: EvaluationContext): 

479 """Evaluate AST node in provided context. 

480 

481 Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments. 

482 

483 Does not evaluate actions that always have side effects: 

484 

485 - class definitions (``class sth: ...``) 

486 - function definitions (``def sth: ...``) 

487 - variable assignments (``x = 1``) 

488 - augmented assignments (``x += 1``) 

489 - deletions (``del x``) 

490 

491 Does not evaluate operations which do not return values: 

492 

493 - assertions (``assert x``) 

494 - pass (``pass``) 

495 - imports (``import x``) 

496 - control flow: 

497 

498 - conditionals (``if x:``) except for ternary IfExp (``a if x else b``) 

499 - loops (``for`` and ``while``) 

500 - exception handling 

501 

502 The purpose of this function is to guard against unwanted side-effects; 

503 it does not give guarantees on protection from malicious code execution. 

504 """ 

505 policy = get_policy(context) 

506 

507 if node is None: 

508 return None 

509 if isinstance(node, ast.Expression): 

510 return eval_node(node.body, context) 

511 if isinstance(node, ast.BinOp): 

512 left = eval_node(node.left, context) 

513 right = eval_node(node.right, context) 

514 dunders = _find_dunder(node.op, BINARY_OP_DUNDERS) 

515 if dunders: 

516 if policy.can_operate(dunders, left, right): 

517 return getattr(left, dunders[0])(right) 

518 else: 

519 raise GuardRejection( 

520 f"Operation (`{dunders}`) for", 

521 type(left), 

522 f"not allowed in {context.evaluation} mode", 

523 ) 

524 if isinstance(node, ast.Compare): 

525 left = eval_node(node.left, context) 

526 all_true = True 

527 negate = False 

528 for op, right in zip(node.ops, node.comparators): 

529 right = eval_node(right, context) 

530 dunder = None 

531 dunders = _find_dunder(op, COMP_OP_DUNDERS) 

532 if not dunders: 

533 if isinstance(op, ast.NotIn): 

534 dunders = COMP_OP_DUNDERS[ast.In] 

535 negate = True 

536 if isinstance(op, ast.Is): 

537 dunder = "is_" 

538 if isinstance(op, ast.IsNot): 

539 dunder = "is_" 

540 negate = True 

541 if not dunder and dunders: 

542 dunder = dunders[0] 

543 if dunder: 

544 a, b = (right, left) if dunder == "__contains__" else (left, right) 

545 if dunder == "is_" or dunders and policy.can_operate(dunders, a, b): 

546 result = getattr(operator, dunder)(a, b) 

547 if negate: 

548 result = not result 

549 if not result: 

550 all_true = False 

551 left = right 

552 else: 

553 raise GuardRejection( 

554 f"Comparison (`{dunder}`) for", 

555 type(left), 

556 f"not allowed in {context.evaluation} mode", 

557 ) 

558 else: 

559 raise ValueError( 

560 f"Comparison `{dunder}` not supported" 

561 ) # pragma: no cover 

562 return all_true 

563 if isinstance(node, ast.Constant): 

564 return node.value 

565 if isinstance(node, ast.Tuple): 

566 return tuple(eval_node(e, context) for e in node.elts) 

567 if isinstance(node, ast.List): 

568 return [eval_node(e, context) for e in node.elts] 

569 if isinstance(node, ast.Set): 

570 return {eval_node(e, context) for e in node.elts} 

571 if isinstance(node, ast.Dict): 

572 return dict( 

573 zip( 

574 [eval_node(k, context) for k in node.keys], 

575 [eval_node(v, context) for v in node.values], 

576 ) 

577 ) 

578 if isinstance(node, ast.Slice): 

579 return slice( 

580 eval_node(node.lower, context), 

581 eval_node(node.upper, context), 

582 eval_node(node.step, context), 

583 ) 

584 if isinstance(node, ast.UnaryOp): 

585 value = eval_node(node.operand, context) 

586 dunders = _find_dunder(node.op, UNARY_OP_DUNDERS) 

587 if dunders: 

588 if policy.can_operate(dunders, value): 

589 return getattr(value, dunders[0])() 

590 else: 

591 raise GuardRejection( 

592 f"Operation (`{dunders}`) for", 

593 type(value), 

594 f"not allowed in {context.evaluation} mode", 

595 ) 

596 if isinstance(node, ast.Subscript): 

597 value = eval_node(node.value, context) 

598 slice_ = eval_node(node.slice, context) 

599 if policy.can_get_item(value, slice_): 

600 return value[slice_] 

601 raise GuardRejection( 

602 "Subscript access (`__getitem__`) for", 

603 type(value), # not joined to avoid calling `repr` 

604 f" not allowed in {context.evaluation} mode", 

605 ) 

606 if isinstance(node, ast.Name): 

607 return _eval_node_name(node.id, context) 

608 if isinstance(node, ast.Attribute): 

609 value = eval_node(node.value, context) 

610 if policy.can_get_attr(value, node.attr): 

611 return getattr(value, node.attr) 

612 raise GuardRejection( 

613 "Attribute access (`__getattr__`) for", 

614 type(value), # not joined to avoid calling `repr` 

615 f"not allowed in {context.evaluation} mode", 

616 ) 

617 if isinstance(node, ast.IfExp): 

618 test = eval_node(node.test, context) 

619 if test: 

620 return eval_node(node.body, context) 

621 else: 

622 return eval_node(node.orelse, context) 

623 if isinstance(node, ast.Call): 

624 func = eval_node(node.func, context) 

625 if policy.can_call(func) and not node.keywords: 

626 args = [eval_node(arg, context) for arg in node.args] 

627 return func(*args) 

628 if isclass(func): 

629 # this code path gets entered when calling class e.g. `MyClass()` 

630 # or `my_instance.__class__()` - in both cases `func` is `MyClass`. 

631 # Should return `MyClass` if `__new__` is not overridden, 

632 # otherwise whatever `__new__` return type is. 

633 overridden_return_type = _eval_return_type(func.__new__, node, context) 

634 if overridden_return_type is not NOT_EVALUATED: 

635 return overridden_return_type 

636 return _create_duck_for_heap_type(func) 

637 else: 

638 return_type = _eval_return_type(func, node, context) 

639 if return_type is not NOT_EVALUATED: 

640 return return_type 

641 raise GuardRejection( 

642 "Call for", 

643 func, # not joined to avoid calling `repr` 

644 f"not allowed in {context.evaluation} mode", 

645 ) 

646 raise ValueError("Unhandled node", ast.dump(node)) 

647 

648 

649def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext): 

650 """Evaluate return type of a given callable function. 

651 

652 Returns the built-in type, a duck or NOT_EVALUATED sentinel. 

653 """ 

654 try: 

655 sig = signature(func) 

656 except ValueError: 

657 sig = UNKNOWN_SIGNATURE 

658 # if annotation was not stringized, or it was stringized 

659 # but resolved by signature call we know the return type 

660 not_empty = sig.return_annotation is not Signature.empty 

661 if not_empty: 

662 return _resolve_annotation(sig.return_annotation, sig, func, node, context) 

663 return NOT_EVALUATED 

664 

665 

666def _resolve_annotation( 

667 annotation, 

668 sig: Signature, 

669 func: Callable, 

670 node: ast.Call, 

671 context: EvaluationContext, 

672): 

673 """Resolve annotation created by user with `typing` module and custom objects.""" 

674 annotation = ( 

675 _eval_node_name(annotation, context) 

676 if isinstance(annotation, str) 

677 else annotation 

678 ) 

679 origin = get_origin(annotation) 

680 if annotation is Self and hasattr(func, "__self__"): 

681 return func.__self__ 

682 elif origin is Literal: 

683 type_args = get_args(annotation) 

684 if len(type_args) == 1: 

685 return type_args[0] 

686 elif annotation is LiteralString: 

687 return "" 

688 elif annotation is AnyStr: 

689 index = None 

690 for i, (key, value) in enumerate(sig.parameters.items()): 

691 if value.annotation is AnyStr: 

692 index = i 

693 break 

694 if index is not None and index < len(node.args): 

695 return eval_node(node.args[index], context) 

696 elif origin is TypeGuard: 

697 return False 

698 elif origin is Union: 

699 attributes = [ 

700 attr 

701 for type_arg in get_args(annotation) 

702 for attr in dir(_resolve_annotation(type_arg, sig, func, node, context)) 

703 ] 

704 return _Duck(attributes=dict.fromkeys(attributes)) 

705 elif is_typeddict(annotation): 

706 return _Duck( 

707 attributes=dict.fromkeys(dir(dict())), 

708 items={ 

709 k: _resolve_annotation(v, sig, func, node, context) 

710 for k, v in annotation.__annotations__.items() 

711 }, 

712 ) 

713 elif hasattr(annotation, "_is_protocol"): 

714 return _Duck(attributes=dict.fromkeys(dir(annotation))) 

715 elif origin is Annotated: 

716 type_arg = get_args(annotation)[0] 

717 return _resolve_annotation(type_arg, sig, func, node, context) 

718 elif isinstance(annotation, NewType): 

719 return _eval_or_create_duck(annotation.__supertype__, node, context) 

720 elif isinstance(annotation, TypeAliasType): 

721 return _eval_or_create_duck(annotation.__value__, node, context) 

722 else: 

723 return _eval_or_create_duck(annotation, node, context) 

724 

725 

726def _eval_node_name(node_id: str, context: EvaluationContext): 

727 policy = get_policy(context) 

728 if policy.allow_locals_access and node_id in context.locals: 

729 return context.locals[node_id] 

730 if policy.allow_globals_access and node_id in context.globals: 

731 return context.globals[node_id] 

732 if policy.allow_builtins_access and hasattr(builtins, node_id): 

733 # note: do not use __builtins__, it is implementation detail of cPython 

734 return getattr(builtins, node_id) 

735 if policy.allow_auto_import and context.auto_import: 

736 return context.auto_import(node_id) 

737 if not policy.allow_globals_access and not policy.allow_locals_access: 

738 raise GuardRejection( 

739 f"Namespace access not allowed in {context.evaluation} mode" 

740 ) 

741 else: 

742 raise NameError(f"{node_id} not found in locals, globals, nor builtins") 

743 

744 

745def _eval_or_create_duck(duck_type, node: ast.Call, context: EvaluationContext): 

746 policy = get_policy(context) 

747 # if allow-listed builtin is on type annotation, instantiate it 

748 if policy.can_call(duck_type) and not node.keywords: 

749 args = [eval_node(arg, context) for arg in node.args] 

750 return duck_type(*args) 

751 # if custom class is in type annotation, mock it 

752 return _create_duck_for_heap_type(duck_type) 

753 

754 

755def _create_duck_for_heap_type(duck_type): 

756 """Create an imitation of an object of a given type (a duck). 

757 

758 Returns the duck or NOT_EVALUATED sentinel if duck could not be created. 

759 """ 

760 duck = ImpersonatingDuck() 

761 try: 

762 # this only works for heap types, not builtins 

763 duck.__class__ = duck_type 

764 return duck 

765 except TypeError: 

766 pass 

767 return NOT_EVALUATED 

768 

769 

770SUPPORTED_EXTERNAL_GETITEM = { 

771 ("pandas", "core", "indexing", "_iLocIndexer"), 

772 ("pandas", "core", "indexing", "_LocIndexer"), 

773 ("pandas", "DataFrame"), 

774 ("pandas", "Series"), 

775 ("numpy", "ndarray"), 

776 ("numpy", "void"), 

777} 

778 

779 

780BUILTIN_GETITEM: set[InstancesHaveGetItem] = { 

781 dict, 

782 str, # type: ignore[arg-type] 

783 bytes, # type: ignore[arg-type] 

784 list, 

785 tuple, 

786 collections.defaultdict, 

787 collections.deque, 

788 collections.OrderedDict, 

789 collections.ChainMap, 

790 collections.UserDict, 

791 collections.UserList, 

792 collections.UserString, # type: ignore[arg-type] 

793 _DummyNamedTuple, 

794 _IdentitySubscript, 

795} 

796 

797 

798def _list_methods(cls, source=None): 

799 """For use on immutable objects or with methods returning a copy""" 

800 return [getattr(cls, k) for k in (source if source else dir(cls))] 

801 

802 

803dict_non_mutating_methods = ("copy", "keys", "values", "items") 

804list_non_mutating_methods = ("copy", "index", "count") 

805set_non_mutating_methods = set(dir(set)) & set(dir(frozenset)) 

806 

807 

808dict_keys: type[collections.abc.KeysView] = type({}.keys()) 

809 

810NUMERICS = {int, float, complex} 

811 

812ALLOWED_CALLS = { 

813 bytes, 

814 *_list_methods(bytes), 

815 dict, 

816 *_list_methods(dict, dict_non_mutating_methods), 

817 dict_keys.isdisjoint, 

818 list, 

819 *_list_methods(list, list_non_mutating_methods), 

820 set, 

821 *_list_methods(set, set_non_mutating_methods), 

822 frozenset, 

823 *_list_methods(frozenset), 

824 range, 

825 str, 

826 *_list_methods(str), 

827 tuple, 

828 *_list_methods(tuple), 

829 *NUMERICS, 

830 *[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)], 

831 collections.deque, 

832 *_list_methods(collections.deque, list_non_mutating_methods), 

833 collections.defaultdict, 

834 *_list_methods(collections.defaultdict, dict_non_mutating_methods), 

835 collections.OrderedDict, 

836 *_list_methods(collections.OrderedDict, dict_non_mutating_methods), 

837 collections.UserDict, 

838 *_list_methods(collections.UserDict, dict_non_mutating_methods), 

839 collections.UserList, 

840 *_list_methods(collections.UserList, list_non_mutating_methods), 

841 collections.UserString, 

842 *_list_methods(collections.UserString, dir(str)), 

843 collections.Counter, 

844 *_list_methods(collections.Counter, dict_non_mutating_methods), 

845 collections.Counter.elements, 

846 collections.Counter.most_common, 

847} 

848 

849BUILTIN_GETATTR: set[MayHaveGetattr] = { 

850 *BUILTIN_GETITEM, 

851 set, 

852 frozenset, 

853 object, 

854 type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`. 

855 *NUMERICS, 

856 dict_keys, 

857 MethodDescriptorType, 

858 ModuleType, 

859} 

860 

861 

862BUILTIN_OPERATIONS = {*BUILTIN_GETATTR} 

863 

864EVALUATION_POLICIES = { 

865 "minimal": EvaluationPolicy( 

866 allow_builtins_access=True, 

867 allow_locals_access=False, 

868 allow_globals_access=False, 

869 allow_item_access=False, 

870 allow_attr_access=False, 

871 allowed_calls=set(), 

872 allow_any_calls=False, 

873 allow_all_operations=False, 

874 ), 

875 "limited": SelectivePolicy( 

876 allowed_getitem=BUILTIN_GETITEM, 

877 allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM, 

878 allowed_getattr=BUILTIN_GETATTR, 

879 allowed_getattr_external={ 

880 # pandas Series/Frame implements custom `__getattr__` 

881 ("pandas", "DataFrame"), 

882 ("pandas", "Series"), 

883 }, 

884 allowed_operations=BUILTIN_OPERATIONS, 

885 allow_builtins_access=True, 

886 allow_locals_access=True, 

887 allow_globals_access=True, 

888 allowed_calls=ALLOWED_CALLS, 

889 ), 

890 "unsafe": EvaluationPolicy( 

891 allow_builtins_access=True, 

892 allow_locals_access=True, 

893 allow_globals_access=True, 

894 allow_attr_access=True, 

895 allow_item_access=True, 

896 allow_any_calls=True, 

897 allow_all_operations=True, 

898 ), 

899} 

900 

901 

902__all__ = [ 

903 "guarded_eval", 

904 "eval_node", 

905 "GuardRejection", 

906 "EvaluationContext", 

907 "_unbind_method", 

908]