Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/operators/python.py: 33%

237 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import inspect 

21import os 

22import pickle 

23import shutil 

24import subprocess 

25import sys 

26import types 

27import warnings 

28from abc import ABCMeta, abstractmethod 

29from pathlib import Path 

30from tempfile import TemporaryDirectory 

31from textwrap import dedent 

32from typing import Any, Callable, Collection, Iterable, Mapping, Sequence 

33 

34import dill 

35 

36from airflow.exceptions import AirflowConfigException, AirflowException, RemovedInAirflow3Warning 

37from airflow.models.baseoperator import BaseOperator 

38from airflow.models.skipmixin import SkipMixin 

39from airflow.models.taskinstance import _CURRENT_CONTEXT 

40from airflow.utils.context import Context, context_copy_partial, context_merge 

41from airflow.utils.operator_helpers import KeywordParameters 

42from airflow.utils.process_utils import execute_in_subprocess 

43from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script 

44from airflow.version import version as airflow_version 

45 

46 

47def task(python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs): 

48 """ 

49 Deprecated function. 

50 Calls @task.python and allows users to turn a python function into 

51 an Airflow task. Please use the following instead: 

52 

53 from airflow.decorators import task 

54 

55 @task 

56 def my_task() 

57 

58 :param python_callable: A reference to an object that is callable 

59 :param op_kwargs: a dictionary of keyword arguments that will get unpacked 

60 in your function (templated) 

61 :param op_args: a list of positional arguments that will get unpacked when 

62 calling your callable (templated) 

63 :param multiple_outputs: if set, function return value will be 

64 unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. 

65 Defaults to False. 

66 :return: 

67 """ 

68 # To maintain backwards compatibility, we import the task object into this file 

69 # This prevents breakages in dags that use `from airflow.operators.python import task` 

70 from airflow.decorators.python import python_task 

71 

72 warnings.warn( 

73 """airflow.operators.python.task is deprecated. Please use the following instead 

74 

75 from airflow.decorators import task 

76 @task 

77 def my_task()""", 

78 RemovedInAirflow3Warning, 

79 stacklevel=2, 

80 ) 

81 return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs) 

82 

83 

84class PythonOperator(BaseOperator): 

85 """ 

86 Executes a Python callable. 

87 

88 .. seealso:: 

89 For more information on how to use this operator, take a look at the guide: 

90 :ref:`howto/operator:PythonOperator` 

91 

92 When running your callable, Airflow will pass a set of keyword arguments that can be used in your 

93 function. This set of kwargs correspond exactly to what you can use in your jinja templates. 

94 For this to work, you need to define ``**kwargs`` in your function header, or you can add directly the 

95 keyword arguments you would like to get - for example with the below code your callable will get 

96 the values of ``ti`` and ``next_ds`` context variables. 

97 

98 With explicit arguments: 

99 

100 .. code-block:: python 

101 

102 def my_python_callable(ti, next_ds): 

103 pass 

104 

105 With kwargs: 

106 

107 .. code-block:: python 

108 

109 def my_python_callable(**kwargs): 

110 ti = kwargs["ti"] 

111 next_ds = kwargs["next_ds"] 

112 

113 

114 :param python_callable: A reference to an object that is callable 

115 :param op_kwargs: a dictionary of keyword arguments that will get unpacked 

116 in your function 

117 :param op_args: a list of positional arguments that will get unpacked when 

118 calling your callable 

119 :param templates_dict: a dictionary where the values are templates that 

120 will get templated by the Airflow engine sometime between 

121 ``__init__`` and ``execute`` takes place and are made available 

122 in your callable's context after the template has been applied. (templated) 

123 :param templates_exts: a list of file extensions to resolve while 

124 processing templated fields, for examples ``['.sql', '.hql']`` 

125 :param show_return_value_in_logs: a bool value whether to show return_value 

126 logs. Defaults to True, which allows return value log output. 

127 It can be set to False to prevent log output of return value when you return huge data 

128 such as transmission a large amount of XCom to TaskAPI. 

129 """ 

130 

131 template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs") 

132 template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"} 

133 BLUE = "#ffefeb" 

134 ui_color = BLUE 

135 

136 # since we won't mutate the arguments, we should just do the shallow copy 

137 # there are some cases we can't deepcopy the objects(e.g protobuf). 

138 shallow_copy_attrs: Sequence[str] = ( 

139 "python_callable", 

140 "op_kwargs", 

141 ) 

142 

143 def __init__( 

144 self, 

145 *, 

146 python_callable: Callable, 

147 op_args: Collection[Any] | None = None, 

148 op_kwargs: Mapping[str, Any] | None = None, 

149 templates_dict: dict[str, Any] | None = None, 

150 templates_exts: Sequence[str] | None = None, 

151 show_return_value_in_logs: bool = True, 

152 **kwargs, 

153 ) -> None: 

154 if kwargs.get("provide_context"): 

155 warnings.warn( 

156 "provide_context is deprecated as of 2.0 and is no longer required", 

157 RemovedInAirflow3Warning, 

158 stacklevel=2, 

159 ) 

160 kwargs.pop("provide_context", None) 

161 super().__init__(**kwargs) 

162 if not callable(python_callable): 

163 raise AirflowException("`python_callable` param must be callable") 

164 self.python_callable = python_callable 

165 self.op_args = op_args or () 

166 self.op_kwargs = op_kwargs or {} 

167 self.templates_dict = templates_dict 

168 if templates_exts: 

169 self.template_ext = templates_exts 

170 self.show_return_value_in_logs = show_return_value_in_logs 

171 

172 def execute(self, context: Context) -> Any: 

173 context_merge(context, self.op_kwargs, templates_dict=self.templates_dict) 

174 self.op_kwargs = self.determine_kwargs(context) 

175 

176 return_value = self.execute_callable() 

177 if self.show_return_value_in_logs: 

178 self.log.info("Done. Returned value was: %s", return_value) 

179 else: 

180 self.log.info("Done. Returned value not shown") 

181 

182 return return_value 

183 

184 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: 

185 return KeywordParameters.determine(self.python_callable, self.op_args, context).unpacking() 

186 

187 def execute_callable(self) -> Any: 

188 """ 

189 Calls the python callable with the given arguments. 

190 

191 :return: the return value of the call. 

192 """ 

193 return self.python_callable(*self.op_args, **self.op_kwargs) 

194 

195 

196class BranchPythonOperator(PythonOperator, SkipMixin): 

197 """ 

198 A workflow can "branch" or follow a path after the execution of this task. 

199 

200 It derives the PythonOperator and expects a Python function that returns 

201 a single task_id or list of task_ids to follow. The task_id(s) returned 

202 should point to a task directly downstream from {self}. All other "branches" 

203 or directly downstream tasks are marked with a state of ``skipped`` so that 

204 these paths can't move forward. The ``skipped`` states are propagated 

205 downstream to allow for the DAG state to fill up and the DAG run's state 

206 to be inferred. 

207 """ 

208 

209 def execute(self, context: Context) -> Any: 

210 branch = super().execute(context) 

211 self.log.info("Branch callable return %s", branch) 

212 self.skip_all_except(context["ti"], branch) 

213 return branch 

214 

215 

216class ShortCircuitOperator(PythonOperator, SkipMixin): 

217 """ 

218 Allows a pipeline to continue based on the result of a ``python_callable``. 

219 

220 The ShortCircuitOperator is derived from the PythonOperator and evaluates the result of a 

221 ``python_callable``. If the returned result is False or a falsy value, the pipeline will be 

222 short-circuited. Downstream tasks will be marked with a state of "skipped" based on the short-circuiting 

223 mode configured. If the returned result is True or a truthy value, downstream tasks proceed as normal and 

224 an ``XCom`` of the returned result is pushed. 

225 

226 The short-circuiting can be configured to either respect or ignore the ``trigger_rule`` set for 

227 downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the default setting, all 

228 downstream tasks are skipped without considering the ``trigger_rule`` defined for tasks. However, if this 

229 parameter is set to False, the direct downstream tasks are skipped but the specified ``trigger_rule`` for 

230 other subsequent downstream tasks are respected. In this mode, the operator assumes the direct downstream 

231 tasks were purposely meant to be skipped but perhaps not other subsequent tasks. 

232 

233 .. seealso:: 

234 For more information on how to use this operator, take a look at the guide: 

235 :ref:`howto/operator:ShortCircuitOperator` 

236 

237 :param ignore_downstream_trigger_rules: If set to True, all downstream tasks from this operator task will 

238 be skipped. This is the default behavior. If set to False, the direct, downstream task(s) will be 

239 skipped but the ``trigger_rule`` defined for a other downstream tasks will be respected. 

240 """ 

241 

242 def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> None: 

243 super().__init__(**kwargs) 

244 self.ignore_downstream_trigger_rules = ignore_downstream_trigger_rules 

245 

246 def execute(self, context: Context) -> Any: 

247 condition = super().execute(context) 

248 self.log.info("Condition result is %s", condition) 

249 

250 if condition: 

251 self.log.info("Proceeding with downstream tasks...") 

252 return condition 

253 

254 downstream_tasks = context["task"].get_flat_relatives(upstream=False) 

255 self.log.debug("Downstream task IDs %s", downstream_tasks) 

256 

257 if downstream_tasks: 

258 dag_run = context["dag_run"] 

259 execution_date = dag_run.execution_date 

260 

261 if self.ignore_downstream_trigger_rules is True: 

262 self.log.info("Skipping all downstream tasks...") 

263 self.skip(dag_run, execution_date, downstream_tasks) 

264 else: 

265 self.log.info("Skipping downstream tasks while respecting trigger rules...") 

266 # Explicitly setting the state of the direct, downstream task(s) to "skipped" and letting the 

267 # Scheduler handle the remaining downstream task(s) appropriately. 

268 self.skip(dag_run, execution_date, context["task"].get_direct_relatives(upstream=False)) 

269 

270 self.log.info("Done.") 

271 

272 

273class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): 

274 BASE_SERIALIZABLE_CONTEXT_KEYS = { 

275 "ds", 

276 "ds_nodash", 

277 "expanded_ti_count", 

278 "inlets", 

279 "next_ds", 

280 "next_ds_nodash", 

281 "outlets", 

282 "prev_ds", 

283 "prev_ds_nodash", 

284 "run_id", 

285 "task_instance_key_str", 

286 "test_mode", 

287 "tomorrow_ds", 

288 "tomorrow_ds_nodash", 

289 "ts", 

290 "ts_nodash", 

291 "ts_nodash_with_tz", 

292 "yesterday_ds", 

293 "yesterday_ds_nodash", 

294 } 

295 PENDULUM_SERIALIZABLE_CONTEXT_KEYS = { 

296 "data_interval_end", 

297 "data_interval_start", 

298 "execution_date", 

299 "logical_date", 

300 "next_execution_date", 

301 "prev_data_interval_end_success", 

302 "prev_data_interval_start_success", 

303 "prev_execution_date", 

304 "prev_execution_date_success", 

305 "prev_start_date_success", 

306 } 

307 AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = { 

308 "macros", 

309 "conf", 

310 "dag", 

311 "dag_run", 

312 "task", 

313 "params", 

314 "triggering_dataset_events", 

315 } 

316 

317 def __init__( 

318 self, 

319 *, 

320 python_callable: Callable, 

321 use_dill: bool = False, 

322 op_args: Collection[Any] | None = None, 

323 op_kwargs: Mapping[str, Any] | None = None, 

324 string_args: Iterable[str] | None = None, 

325 templates_dict: dict | None = None, 

326 templates_exts: list[str] | None = None, 

327 expect_airflow: bool = True, 

328 **kwargs, 

329 ): 

330 if ( 

331 not isinstance(python_callable, types.FunctionType) 

332 or isinstance(python_callable, types.LambdaType) 

333 and python_callable.__name__ == "<lambda>" 

334 ): 

335 raise AirflowException("PythonVirtualenvOperator only supports functions for python_callable arg") 

336 super().__init__( 

337 python_callable=python_callable, 

338 op_args=op_args, 

339 op_kwargs=op_kwargs, 

340 templates_dict=templates_dict, 

341 templates_exts=templates_exts, 

342 **kwargs, 

343 ) 

344 self.string_args = string_args or [] 

345 self.use_dill = use_dill 

346 self.pickling_library = dill if self.use_dill else pickle 

347 self.expect_airflow = expect_airflow 

348 

349 @abstractmethod 

350 def _iter_serializable_context_keys(self): 

351 pass 

352 

353 def execute(self, context: Context) -> Any: 

354 serializable_keys = set(self._iter_serializable_context_keys()) 

355 serializable_context = context_copy_partial(context, serializable_keys) 

356 return super().execute(context=serializable_context) 

357 

358 def get_python_source(self): 

359 """Return the source of self.python_callable.""" 

360 return dedent(inspect.getsource(self.python_callable)) 

361 

362 def _write_args(self, file: Path): 

363 if self.op_args or self.op_kwargs: 

364 file.write_bytes(self.pickling_library.dumps({"args": self.op_args, "kwargs": self.op_kwargs})) 

365 

366 def _write_string_args(self, file: Path): 

367 file.write_text("\n".join(map(str, self.string_args))) 

368 

369 def _read_result(self, path: Path): 

370 if path.stat().st_size == 0: 

371 return None 

372 try: 

373 return self.pickling_library.loads(path.read_bytes()) 

374 except ValueError: 

375 self.log.error( 

376 "Error deserializing result. Note that result deserialization " 

377 "is not supported across major Python versions." 

378 ) 

379 raise 

380 

381 def __deepcopy__(self, memo): 

382 # module objects can't be copied _at all__ 

383 memo[id(self.pickling_library)] = self.pickling_library 

384 return super().__deepcopy__(memo) 

385 

386 def _execute_python_callable_in_subprocess(self, python_path: Path, tmp_dir: Path): 

387 op_kwargs: dict[str, Any] = {k: v for k, v in self.op_kwargs.items()} 

388 if self.templates_dict: 

389 op_kwargs["templates_dict"] = self.templates_dict 

390 input_path = tmp_dir / "script.in" 

391 output_path = tmp_dir / "script.out" 

392 string_args_path = tmp_dir / "string_args.txt" 

393 script_path = tmp_dir / "script.py" 

394 self._write_args(input_path) 

395 self._write_string_args(string_args_path) 

396 write_python_script( 

397 jinja_context=dict( 

398 op_args=self.op_args, 

399 op_kwargs=op_kwargs, 

400 expect_airflow=self.expect_airflow, 

401 pickling_library=self.pickling_library.__name__, 

402 python_callable=self.python_callable.__name__, 

403 python_callable_source=self.get_python_source(), 

404 ), 

405 filename=os.fspath(script_path), 

406 render_template_as_native_obj=self.dag.render_template_as_native_obj, 

407 ) 

408 

409 execute_in_subprocess( 

410 cmd=[ 

411 os.fspath(python_path), 

412 os.fspath(script_path), 

413 os.fspath(input_path), 

414 os.fspath(output_path), 

415 os.fspath(string_args_path), 

416 ] 

417 ) 

418 return self._read_result(output_path) 

419 

420 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: 

421 return KeywordParameters.determine(self.python_callable, self.op_args, context).serializing() 

422 

423 

424class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): 

425 """ 

426 Run a function in a virtualenv that is created and destroyed automatically. 

427 

428 The function (has certain caveats) must be defined using def, and not be 

429 part of a class. All imports must happen inside the function 

430 and no variables outside the scope may be referenced. A global scope 

431 variable named virtualenv_string_args will be available (populated by 

432 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one 

433 can use a return value. 

434 Note that if your virtualenv runs in a different Python major version than Airflow, 

435 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to 

436 Airflow through plugins. You can use string_args though. 

437 

438 .. seealso:: 

439 For more information on how to use this operator, take a look at the guide: 

440 :ref:`howto/operator:PythonVirtualenvOperator` 

441 

442 :param python_callable: A python function with no references to outside variables, 

443 defined with def, which will be run in a virtualenv 

444 :param requirements: Either a list of requirement strings, or a (templated) 

445 "requirements file" as specified by pip. 

446 :param python_version: The Python version to run the virtualenv with. Note that 

447 both 2 and 2.7 are acceptable forms. 

448 :param use_dill: Whether to use dill to serialize 

449 the args and result (pickle is default). This allow more complex types 

450 but requires you to include dill in your requirements. 

451 :param system_site_packages: Whether to include 

452 system_site_packages in your virtualenv. 

453 See virtualenv documentation for more information. 

454 :param pip_install_options: a list of pip install options when installing requirements 

455 See 'pip install -h' for available options 

456 :param op_args: A list of positional arguments to pass to python_callable. 

457 :param op_kwargs: A dict of keyword arguments to pass to python_callable. 

458 :param string_args: Strings that are present in the global var virtualenv_string_args, 

459 available to python_callable at runtime as a list[str]. Note that args are split 

460 by newline. 

461 :param templates_dict: a dictionary where the values are templates that 

462 will get templated by the Airflow engine sometime between 

463 ``__init__`` and ``execute`` takes place and are made available 

464 in your callable's context after the template has been applied 

465 :param templates_exts: a list of file extensions to resolve while 

466 processing templated fields, for examples ``['.sql', '.hql']`` 

467 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator 

468 will raise warning if Airflow is not installed, and it will attempt to load Airflow 

469 macros when starting. 

470 """ 

471 

472 template_fields: Sequence[str] = tuple({"requirements"} | set(PythonOperator.template_fields)) 

473 template_ext: Sequence[str] = (".txt",) 

474 

475 def __init__( 

476 self, 

477 *, 

478 python_callable: Callable, 

479 requirements: None | Iterable[str] | str = None, 

480 python_version: str | int | float | None = None, 

481 use_dill: bool = False, 

482 system_site_packages: bool = True, 

483 pip_install_options: list[str] | None = None, 

484 op_args: Collection[Any] | None = None, 

485 op_kwargs: Mapping[str, Any] | None = None, 

486 string_args: Iterable[str] | None = None, 

487 templates_dict: dict | None = None, 

488 templates_exts: list[str] | None = None, 

489 expect_airflow: bool = True, 

490 **kwargs, 

491 ): 

492 if ( 

493 python_version 

494 and str(python_version)[0] != str(sys.version_info.major) 

495 and (op_args or op_kwargs) 

496 ): 

497 raise AirflowException( 

498 "Passing op_args or op_kwargs is not supported across different Python " 

499 "major versions for PythonVirtualenvOperator. Please use string_args." 

500 f"Sys version: {sys.version_info}. Venv version: {python_version}" 

501 ) 

502 if not shutil.which("virtualenv"): 

503 raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.") 

504 if not requirements: 

505 self.requirements: list[str] | str = [] 

506 elif isinstance(requirements, str): 

507 self.requirements = requirements 

508 else: 

509 self.requirements = list(requirements) 

510 self.python_version = python_version 

511 self.system_site_packages = system_site_packages 

512 self.pip_install_options = pip_install_options 

513 super().__init__( 

514 python_callable=python_callable, 

515 use_dill=use_dill, 

516 op_args=op_args, 

517 op_kwargs=op_kwargs, 

518 string_args=string_args, 

519 templates_dict=templates_dict, 

520 templates_exts=templates_exts, 

521 expect_airflow=expect_airflow, 

522 **kwargs, 

523 ) 

524 

525 def execute_callable(self): 

526 with TemporaryDirectory(prefix="venv") as tmp_dir: 

527 tmp_path = Path(tmp_dir) 

528 requirements_file_name = f"{tmp_dir}/requirements.txt" 

529 

530 if not isinstance(self.requirements, str): 

531 requirements_file_contents = "\n".join(str(dependency) for dependency in self.requirements) 

532 else: 

533 requirements_file_contents = self.requirements 

534 

535 if not self.system_site_packages and self.use_dill: 

536 requirements_file_contents += "\ndill" 

537 

538 with open(requirements_file_name, "w") as file: 

539 file.write(requirements_file_contents) 

540 prepare_virtualenv( 

541 venv_directory=tmp_dir, 

542 python_bin=f"python{self.python_version}" if self.python_version else None, 

543 system_site_packages=self.system_site_packages, 

544 requirements_file_path=requirements_file_name, 

545 pip_install_options=self.pip_install_options, 

546 ) 

547 python_path = tmp_path / "bin" / "python" 

548 

549 return self._execute_python_callable_in_subprocess(python_path, tmp_path) 

550 

551 def _iter_serializable_context_keys(self): 

552 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS 

553 if self.system_site_packages or "apache-airflow" in self.requirements: 

554 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS 

555 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

556 elif "pendulum" in self.requirements: 

557 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

558 

559 

560class ExternalPythonOperator(_BasePythonVirtualenvOperator): 

561 """ 

562 Run a function in a virtualenv that is not re-created. 

563 

564 Reused as is without the overhead of creating the virtualenv (with certain caveats). 

565 

566 The function must be defined using def, and not be 

567 part of a class. All imports must happen inside the function 

568 and no variables outside the scope may be referenced. A global scope 

569 variable named virtualenv_string_args will be available (populated by 

570 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one 

571 can use a return value. 

572 Note that if your virtualenv runs in a different Python major version than Airflow, 

573 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to 

574 Airflow through plugins. You can use string_args though. 

575 

576 If Airflow is installed in the external environment in different version that the version 

577 used by the operator, the operator will fail., 

578 

579 .. seealso:: 

580 For more information on how to use this operator, take a look at the guide: 

581 :ref:`howto/operator:ExternalPythonOperator` 

582 

583 :param python: Full path string (file-system specific) that points to a Python binary inside 

584 a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path 

585 (so usually start with "/" or "X:/" depending on the filesystem/os used). 

586 :param python_callable: A python function with no references to outside variables, 

587 defined with def, which will be run in a virtualenv 

588 :param use_dill: Whether to use dill to serialize 

589 the args and result (pickle is default). This allow more complex types 

590 but if dill is not preinstalled in your venv, the task will fail with use_dill enabled. 

591 :param op_args: A list of positional arguments to pass to python_callable. 

592 :param op_kwargs: A dict of keyword arguments to pass to python_callable. 

593 :param string_args: Strings that are present in the global var virtualenv_string_args, 

594 available to python_callable at runtime as a list[str]. Note that args are split 

595 by newline. 

596 :param templates_dict: a dictionary where the values are templates that 

597 will get templated by the Airflow engine sometime between 

598 ``__init__`` and ``execute`` takes place and are made available 

599 in your callable's context after the template has been applied 

600 :param templates_exts: a list of file extensions to resolve while 

601 processing templated fields, for examples ``['.sql', '.hql']`` 

602 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator 

603 will raise warning if Airflow is not installed, and it will attempt to load Airflow 

604 macros when starting. 

605 """ 

606 

607 template_fields: Sequence[str] = tuple({"python"} | set(PythonOperator.template_fields)) 

608 

609 def __init__( 

610 self, 

611 *, 

612 python: str, 

613 python_callable: Callable, 

614 use_dill: bool = False, 

615 op_args: Collection[Any] | None = None, 

616 op_kwargs: Mapping[str, Any] | None = None, 

617 string_args: Iterable[str] | None = None, 

618 templates_dict: dict | None = None, 

619 templates_exts: list[str] | None = None, 

620 expect_airflow: bool = True, 

621 expect_pendulum: bool = False, 

622 **kwargs, 

623 ): 

624 if not python: 

625 raise ValueError("Python Path must be defined in ExternalPythonOperator") 

626 self.python = python 

627 self.expect_pendulum = expect_pendulum 

628 super().__init__( 

629 python_callable=python_callable, 

630 use_dill=use_dill, 

631 op_args=op_args, 

632 op_kwargs=op_kwargs, 

633 string_args=string_args, 

634 templates_dict=templates_dict, 

635 templates_exts=templates_exts, 

636 expect_airflow=expect_airflow, 

637 **kwargs, 

638 ) 

639 

640 def execute_callable(self): 

641 python_path = Path(self.python) 

642 if not python_path.exists(): 

643 raise ValueError(f"Python Path '{python_path}' must exists") 

644 if not python_path.is_file(): 

645 raise ValueError(f"Python Path '{python_path}' must be a file") 

646 if not python_path.is_absolute(): 

647 raise ValueError(f"Python Path '{python_path}' must be an absolute path.") 

648 python_version_as_list_of_strings = self._get_python_version_from_environment() 

649 if ( 

650 python_version_as_list_of_strings 

651 and str(python_version_as_list_of_strings[0]) != str(sys.version_info.major) 

652 and (self.op_args or self.op_kwargs) 

653 ): 

654 raise AirflowException( 

655 "Passing op_args or op_kwargs is not supported across different Python " 

656 "major versions for ExternalPythonOperator. Please use string_args." 

657 f"Sys version: {sys.version_info}. Venv version: {python_version_as_list_of_strings}" 

658 ) 

659 with TemporaryDirectory(prefix="tmd") as tmp_dir: 

660 tmp_path = Path(tmp_dir) 

661 return self._execute_python_callable_in_subprocess(python_path, tmp_path) 

662 

663 def _get_python_version_from_environment(self) -> list[str]: 

664 try: 

665 result = subprocess.check_output([self.python, "--version"], text=True) 

666 return result.strip().split(" ")[-1].split(".") 

667 except Exception as e: 

668 raise ValueError(f"Error while executing {self.python}: {e}") 

669 

670 def _iter_serializable_context_keys(self): 

671 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS 

672 if self._get_airflow_version_from_target_env(): 

673 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS 

674 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

675 elif self._is_pendulum_installed_in_target_env(): 

676 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

677 

678 def _is_pendulum_installed_in_target_env(self) -> bool: 

679 try: 

680 subprocess.check_call([self.python, "-c", "import pendulum"]) 

681 return True 

682 except Exception as e: 

683 if self.expect_pendulum: 

684 self.log.warning("When checking for Pendulum installed in venv got %s", e) 

685 self.log.warning( 

686 "Pendulum is not properly installed in the virtualenv " 

687 "Pendulum context keys will not be available. " 

688 "Please Install Pendulum or Airflow in your venv to access them." 

689 ) 

690 return False 

691 

692 def _get_airflow_version_from_target_env(self) -> str | None: 

693 try: 

694 result = subprocess.check_output( 

695 [self.python, "-c", "from airflow import version; print(version.version)"], text=True 

696 ) 

697 target_airflow_version = result.strip() 

698 if target_airflow_version != airflow_version: 

699 raise AirflowConfigException( 

700 f"The version of Airflow installed for the {self.python}(" 

701 f"{target_airflow_version}) is different than the runtime Airflow version: " 

702 f"{airflow_version}. Make sure your environment has the same Airflow version " 

703 f"installed as the Airflow runtime." 

704 ) 

705 return target_airflow_version 

706 except Exception as e: 

707 if self.expect_airflow: 

708 self.log.warning("When checking for Airflow installed in venv got %s", e) 

709 self.log.warning( 

710 f"This means that Airflow is not properly installed by " 

711 f"{self.python}. Airflow context keys will not be available. " 

712 f"Please Install Airflow {airflow_version} in your environment to access them." 

713 ) 

714 return None 

715 

716 

717def get_current_context() -> Context: 

718 """ 

719 Retrieve the execution context dictionary without altering user method's signature. 

720 This is the simplest method of retrieving the execution context dictionary. 

721 

722 **Old style:** 

723 

724 .. code:: python 

725 

726 def my_task(**context): 

727 ti = context["ti"] 

728 

729 **New style:** 

730 

731 .. code:: python 

732 

733 from airflow.operators.python import get_current_context 

734 

735 

736 def my_task(): 

737 context = get_current_context() 

738 ti = context["ti"] 

739 

740 Current context will only have value if this method was called after an operator 

741 was starting to execute. 

742 """ 

743 if not _CURRENT_CONTEXT: 

744 raise AirflowException( 

745 "Current context was requested but no context was found! " 

746 "Are you running within an airflow task?" 

747 ) 

748 return _CURRENT_CONTEXT[-1]