Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/operators/python.py: 32%

244 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import inspect 

21import os 

22import pickle 

23import shutil 

24import subprocess 

25import sys 

26import types 

27import warnings 

28from abc import ABCMeta, abstractmethod 

29from collections.abc import Container 

30from pathlib import Path 

31from tempfile import TemporaryDirectory 

32from textwrap import dedent 

33from typing import Any, Callable, Collection, Iterable, Mapping, Sequence 

34 

35import dill 

36 

37from airflow.exceptions import ( 

38 AirflowConfigException, 

39 AirflowException, 

40 AirflowSkipException, 

41 DeserializingResultError, 

42 RemovedInAirflow3Warning, 

43) 

44from airflow.models.baseoperator import BaseOperator 

45from airflow.models.skipmixin import SkipMixin 

46from airflow.models.taskinstance import _CURRENT_CONTEXT 

47from airflow.utils.context import Context, context_copy_partial, context_merge 

48from airflow.utils.operator_helpers import KeywordParameters 

49from airflow.utils.process_utils import execute_in_subprocess 

50from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script 

51 

52 

53def task(python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs): 

54 """Deprecated. Use :func:`airflow.decorators.task` instead. 

55 

56 Calls ``@task.python`` and allows users to turn a Python function into 

57 an Airflow task. 

58 

59 :param python_callable: A reference to an object that is callable 

60 :param op_kwargs: a dictionary of keyword arguments that will get unpacked 

61 in your function (templated) 

62 :param op_args: a list of positional arguments that will get unpacked when 

63 calling your callable (templated) 

64 :param multiple_outputs: if set, function return value will be 

65 unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. 

66 Defaults to False. 

67 """ 

68 # To maintain backwards compatibility, we import the task object into this file 

69 # This prevents breakages in dags that use `from airflow.operators.python import task` 

70 from airflow.decorators.python import python_task 

71 

72 warnings.warn( 

73 """airflow.operators.python.task is deprecated. Please use the following instead 

74 

75 from airflow.decorators import task 

76 @task 

77 def my_task()""", 

78 RemovedInAirflow3Warning, 

79 stacklevel=2, 

80 ) 

81 return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs) 

82 

83 

84class PythonOperator(BaseOperator): 

85 """ 

86 Executes a Python callable. 

87 

88 .. seealso:: 

89 For more information on how to use this operator, take a look at the guide: 

90 :ref:`howto/operator:PythonOperator` 

91 

92 When running your callable, Airflow will pass a set of keyword arguments that can be used in your 

93 function. This set of kwargs correspond exactly to what you can use in your jinja templates. 

94 For this to work, you need to define ``**kwargs`` in your function header, or you can add directly the 

95 keyword arguments you would like to get - for example with the below code your callable will get 

96 the values of ``ti`` and ``next_ds`` context variables. 

97 

98 With explicit arguments: 

99 

100 .. code-block:: python 

101 

102 def my_python_callable(ti, next_ds): 

103 pass 

104 

105 With kwargs: 

106 

107 .. code-block:: python 

108 

109 def my_python_callable(**kwargs): 

110 ti = kwargs["ti"] 

111 next_ds = kwargs["next_ds"] 

112 

113 

114 :param python_callable: A reference to an object that is callable 

115 :param op_kwargs: a dictionary of keyword arguments that will get unpacked 

116 in your function 

117 :param op_args: a list of positional arguments that will get unpacked when 

118 calling your callable 

119 :param templates_dict: a dictionary where the values are templates that 

120 will get templated by the Airflow engine sometime between 

121 ``__init__`` and ``execute`` takes place and are made available 

122 in your callable's context after the template has been applied. (templated) 

123 :param templates_exts: a list of file extensions to resolve while 

124 processing templated fields, for examples ``['.sql', '.hql']`` 

125 :param show_return_value_in_logs: a bool value whether to show return_value 

126 logs. Defaults to True, which allows return value log output. 

127 It can be set to False to prevent log output of return value when you return huge data 

128 such as transmission a large amount of XCom to TaskAPI. 

129 """ 

130 

131 template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs") 

132 template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"} 

133 BLUE = "#ffefeb" 

134 ui_color = BLUE 

135 

136 # since we won't mutate the arguments, we should just do the shallow copy 

137 # there are some cases we can't deepcopy the objects(e.g protobuf). 

138 shallow_copy_attrs: Sequence[str] = ( 

139 "python_callable", 

140 "op_kwargs", 

141 ) 

142 

143 def __init__( 

144 self, 

145 *, 

146 python_callable: Callable, 

147 op_args: Collection[Any] | None = None, 

148 op_kwargs: Mapping[str, Any] | None = None, 

149 templates_dict: dict[str, Any] | None = None, 

150 templates_exts: Sequence[str] | None = None, 

151 show_return_value_in_logs: bool = True, 

152 **kwargs, 

153 ) -> None: 

154 if kwargs.get("provide_context"): 

155 warnings.warn( 

156 "provide_context is deprecated as of 2.0 and is no longer required", 

157 RemovedInAirflow3Warning, 

158 stacklevel=2, 

159 ) 

160 kwargs.pop("provide_context", None) 

161 super().__init__(**kwargs) 

162 if not callable(python_callable): 

163 raise AirflowException("`python_callable` param must be callable") 

164 self.python_callable = python_callable 

165 self.op_args = op_args or () 

166 self.op_kwargs = op_kwargs or {} 

167 self.templates_dict = templates_dict 

168 if templates_exts: 

169 self.template_ext = templates_exts 

170 self.show_return_value_in_logs = show_return_value_in_logs 

171 

172 def execute(self, context: Context) -> Any: 

173 context_merge(context, self.op_kwargs, templates_dict=self.templates_dict) 

174 self.op_kwargs = self.determine_kwargs(context) 

175 

176 return_value = self.execute_callable() 

177 if self.show_return_value_in_logs: 

178 self.log.info("Done. Returned value was: %s", return_value) 

179 else: 

180 self.log.info("Done. Returned value not shown") 

181 

182 return return_value 

183 

184 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: 

185 return KeywordParameters.determine(self.python_callable, self.op_args, context).unpacking() 

186 

187 def execute_callable(self) -> Any: 

188 """ 

189 Calls the python callable with the given arguments. 

190 

191 :return: the return value of the call. 

192 """ 

193 return self.python_callable(*self.op_args, **self.op_kwargs) 

194 

195 

196class BranchPythonOperator(PythonOperator, SkipMixin): 

197 """ 

198 A workflow can "branch" or follow a path after the execution of this task. 

199 

200 It derives the PythonOperator and expects a Python function that returns 

201 a single task_id or list of task_ids to follow. The task_id(s) returned 

202 should point to a task directly downstream from {self}. All other "branches" 

203 or directly downstream tasks are marked with a state of ``skipped`` so that 

204 these paths can't move forward. The ``skipped`` states are propagated 

205 downstream to allow for the DAG state to fill up and the DAG run's state 

206 to be inferred. 

207 """ 

208 

209 def execute(self, context: Context) -> Any: 

210 branch = super().execute(context) 

211 self.log.info("Branch callable return %s", branch) 

212 self.skip_all_except(context["ti"], branch) 

213 return branch 

214 

215 

216class ShortCircuitOperator(PythonOperator, SkipMixin): 

217 """ 

218 Allows a pipeline to continue based on the result of a ``python_callable``. 

219 

220 The ShortCircuitOperator is derived from the PythonOperator and evaluates the result of a 

221 ``python_callable``. If the returned result is False or a falsy value, the pipeline will be 

222 short-circuited. Downstream tasks will be marked with a state of "skipped" based on the short-circuiting 

223 mode configured. If the returned result is True or a truthy value, downstream tasks proceed as normal and 

224 an ``XCom`` of the returned result is pushed. 

225 

226 The short-circuiting can be configured to either respect or ignore the ``trigger_rule`` set for 

227 downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the default setting, all 

228 downstream tasks are skipped without considering the ``trigger_rule`` defined for tasks. However, if this 

229 parameter is set to False, the direct downstream tasks are skipped but the specified ``trigger_rule`` for 

230 other subsequent downstream tasks are respected. In this mode, the operator assumes the direct downstream 

231 tasks were purposely meant to be skipped but perhaps not other subsequent tasks. 

232 

233 .. seealso:: 

234 For more information on how to use this operator, take a look at the guide: 

235 :ref:`howto/operator:ShortCircuitOperator` 

236 

237 :param ignore_downstream_trigger_rules: If set to True, all downstream tasks from this operator task will 

238 be skipped. This is the default behavior. If set to False, the direct, downstream task(s) will be 

239 skipped but the ``trigger_rule`` defined for all other downstream tasks will be respected. 

240 """ 

241 

242 def __init__(self, *, ignore_downstream_trigger_rules: bool = True, **kwargs) -> None: 

243 super().__init__(**kwargs) 

244 self.ignore_downstream_trigger_rules = ignore_downstream_trigger_rules 

245 

246 def execute(self, context: Context) -> Any: 

247 condition = super().execute(context) 

248 self.log.info("Condition result is %s", condition) 

249 

250 if condition: 

251 self.log.info("Proceeding with downstream tasks...") 

252 return condition 

253 

254 downstream_tasks = context["task"].get_flat_relatives(upstream=False) 

255 self.log.debug("Downstream task IDs %s", downstream_tasks) 

256 

257 if downstream_tasks: 

258 dag_run = context["dag_run"] 

259 execution_date = dag_run.execution_date 

260 

261 if self.ignore_downstream_trigger_rules is True: 

262 self.log.info("Skipping all downstream tasks...") 

263 self.skip(dag_run, execution_date, downstream_tasks, map_index=context["ti"].map_index) 

264 else: 

265 self.log.info("Skipping downstream tasks while respecting trigger rules...") 

266 # Explicitly setting the state of the direct, downstream task(s) to "skipped" and letting the 

267 # Scheduler handle the remaining downstream task(s) appropriately. 

268 self.skip( 

269 dag_run, 

270 execution_date, 

271 context["task"].get_direct_relatives(upstream=False), 

272 map_index=context["ti"].map_index, 

273 ) 

274 

275 self.log.info("Done.") 

276 

277 

278class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): 

279 BASE_SERIALIZABLE_CONTEXT_KEYS = { 

280 "ds", 

281 "ds_nodash", 

282 "expanded_ti_count", 

283 "inlets", 

284 "next_ds", 

285 "next_ds_nodash", 

286 "outlets", 

287 "prev_ds", 

288 "prev_ds_nodash", 

289 "run_id", 

290 "task_instance_key_str", 

291 "test_mode", 

292 "tomorrow_ds", 

293 "tomorrow_ds_nodash", 

294 "ts", 

295 "ts_nodash", 

296 "ts_nodash_with_tz", 

297 "yesterday_ds", 

298 "yesterday_ds_nodash", 

299 } 

300 PENDULUM_SERIALIZABLE_CONTEXT_KEYS = { 

301 "data_interval_end", 

302 "data_interval_start", 

303 "execution_date", 

304 "logical_date", 

305 "next_execution_date", 

306 "prev_data_interval_end_success", 

307 "prev_data_interval_start_success", 

308 "prev_execution_date", 

309 "prev_execution_date_success", 

310 "prev_start_date_success", 

311 } 

312 AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = { 

313 "macros", 

314 "conf", 

315 "dag", 

316 "dag_run", 

317 "task", 

318 "params", 

319 "triggering_dataset_events", 

320 } 

321 

322 def __init__( 

323 self, 

324 *, 

325 python_callable: Callable, 

326 use_dill: bool = False, 

327 op_args: Collection[Any] | None = None, 

328 op_kwargs: Mapping[str, Any] | None = None, 

329 string_args: Iterable[str] | None = None, 

330 templates_dict: dict | None = None, 

331 templates_exts: list[str] | None = None, 

332 expect_airflow: bool = True, 

333 skip_on_exit_code: int | Container[int] | None = None, 

334 **kwargs, 

335 ): 

336 if ( 

337 not isinstance(python_callable, types.FunctionType) 

338 or isinstance(python_callable, types.LambdaType) 

339 and python_callable.__name__ == "<lambda>" 

340 ): 

341 raise AirflowException("PythonVirtualenvOperator only supports functions for python_callable arg") 

342 super().__init__( 

343 python_callable=python_callable, 

344 op_args=op_args, 

345 op_kwargs=op_kwargs, 

346 templates_dict=templates_dict, 

347 templates_exts=templates_exts, 

348 **kwargs, 

349 ) 

350 self.string_args = string_args or [] 

351 self.use_dill = use_dill 

352 self.pickling_library = dill if self.use_dill else pickle 

353 self.expect_airflow = expect_airflow 

354 self.skip_on_exit_code = ( 

355 skip_on_exit_code 

356 if isinstance(skip_on_exit_code, Container) 

357 else [skip_on_exit_code] 

358 if skip_on_exit_code 

359 else [] 

360 ) 

361 

362 @abstractmethod 

363 def _iter_serializable_context_keys(self): 

364 pass 

365 

366 def execute(self, context: Context) -> Any: 

367 serializable_keys = set(self._iter_serializable_context_keys()) 

368 serializable_context = context_copy_partial(context, serializable_keys) 

369 return super().execute(context=serializable_context) 

370 

371 def get_python_source(self): 

372 """Return the source of self.python_callable.""" 

373 return dedent(inspect.getsource(self.python_callable)) 

374 

375 def _write_args(self, file: Path): 

376 if self.op_args or self.op_kwargs: 

377 file.write_bytes(self.pickling_library.dumps({"args": self.op_args, "kwargs": self.op_kwargs})) 

378 

379 def _write_string_args(self, file: Path): 

380 file.write_text("\n".join(map(str, self.string_args))) 

381 

382 def _read_result(self, path: Path): 

383 if path.stat().st_size == 0: 

384 return None 

385 try: 

386 return self.pickling_library.loads(path.read_bytes()) 

387 except ValueError as value_error: 

388 raise DeserializingResultError() from value_error 

389 

390 def __deepcopy__(self, memo): 

391 # module objects can't be copied _at all__ 

392 memo[id(self.pickling_library)] = self.pickling_library 

393 return super().__deepcopy__(memo) 

394 

395 def _execute_python_callable_in_subprocess(self, python_path: Path, tmp_dir: Path): 

396 op_kwargs: dict[str, Any] = {k: v for k, v in self.op_kwargs.items()} 

397 if self.templates_dict: 

398 op_kwargs["templates_dict"] = self.templates_dict 

399 input_path = tmp_dir / "script.in" 

400 output_path = tmp_dir / "script.out" 

401 string_args_path = tmp_dir / "string_args.txt" 

402 script_path = tmp_dir / "script.py" 

403 self._write_args(input_path) 

404 self._write_string_args(string_args_path) 

405 write_python_script( 

406 jinja_context=dict( 

407 op_args=self.op_args, 

408 op_kwargs=op_kwargs, 

409 expect_airflow=self.expect_airflow, 

410 pickling_library=self.pickling_library.__name__, 

411 python_callable=self.python_callable.__name__, 

412 python_callable_source=self.get_python_source(), 

413 ), 

414 filename=os.fspath(script_path), 

415 render_template_as_native_obj=self.dag.render_template_as_native_obj, 

416 ) 

417 

418 try: 

419 execute_in_subprocess( 

420 cmd=[ 

421 os.fspath(python_path), 

422 os.fspath(script_path), 

423 os.fspath(input_path), 

424 os.fspath(output_path), 

425 os.fspath(string_args_path), 

426 ] 

427 ) 

428 except subprocess.CalledProcessError as e: 

429 if e.returncode in self.skip_on_exit_code: 

430 raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.") 

431 else: 

432 raise 

433 

434 return self._read_result(output_path) 

435 

436 def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: 

437 return KeywordParameters.determine(self.python_callable, self.op_args, context).serializing() 

438 

439 

440class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): 

441 """ 

442 Run a function in a virtualenv that is created and destroyed automatically. 

443 

444 The function (has certain caveats) must be defined using def, and not be 

445 part of a class. All imports must happen inside the function 

446 and no variables outside the scope may be referenced. A global scope 

447 variable named virtualenv_string_args will be available (populated by 

448 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one 

449 can use a return value. 

450 Note that if your virtualenv runs in a different Python major version than Airflow, 

451 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to 

452 Airflow through plugins. You can use string_args though. 

453 

454 .. seealso:: 

455 For more information on how to use this operator, take a look at the guide: 

456 :ref:`howto/operator:PythonVirtualenvOperator` 

457 

458 :param python_callable: A python function with no references to outside variables, 

459 defined with def, which will be run in a virtualenv 

460 :param requirements: Either a list of requirement strings, or a (templated) 

461 "requirements file" as specified by pip. 

462 :param python_version: The Python version to run the virtualenv with. Note that 

463 both 2 and 2.7 are acceptable forms. 

464 :param use_dill: Whether to use dill to serialize 

465 the args and result (pickle is default). This allow more complex types 

466 but requires you to include dill in your requirements. 

467 :param system_site_packages: Whether to include 

468 system_site_packages in your virtualenv. 

469 See virtualenv documentation for more information. 

470 :param pip_install_options: a list of pip install options when installing requirements 

471 See 'pip install -h' for available options 

472 :param op_args: A list of positional arguments to pass to python_callable. 

473 :param op_kwargs: A dict of keyword arguments to pass to python_callable. 

474 :param string_args: Strings that are present in the global var virtualenv_string_args, 

475 available to python_callable at runtime as a list[str]. Note that args are split 

476 by newline. 

477 :param templates_dict: a dictionary where the values are templates that 

478 will get templated by the Airflow engine sometime between 

479 ``__init__`` and ``execute`` takes place and are made available 

480 in your callable's context after the template has been applied 

481 :param templates_exts: a list of file extensions to resolve while 

482 processing templated fields, for examples ``['.sql', '.hql']`` 

483 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator 

484 will raise warning if Airflow is not installed, and it will attempt to load Airflow 

485 macros when starting. 

486 :param skip_on_exit_code: If python_callable exits with this exit code, leave the task 

487 in ``skipped`` state (default: None). If set to ``None``, any non-zero 

488 exit code will be treated as a failure. 

489 """ 

490 

491 template_fields: Sequence[str] = tuple({"requirements"} | set(PythonOperator.template_fields)) 

492 template_ext: Sequence[str] = (".txt",) 

493 

494 def __init__( 

495 self, 

496 *, 

497 python_callable: Callable, 

498 requirements: None | Iterable[str] | str = None, 

499 python_version: str | int | float | None = None, 

500 use_dill: bool = False, 

501 system_site_packages: bool = True, 

502 pip_install_options: list[str] | None = None, 

503 op_args: Collection[Any] | None = None, 

504 op_kwargs: Mapping[str, Any] | None = None, 

505 string_args: Iterable[str] | None = None, 

506 templates_dict: dict | None = None, 

507 templates_exts: list[str] | None = None, 

508 expect_airflow: bool = True, 

509 skip_on_exit_code: int | Container[int] | None = None, 

510 **kwargs, 

511 ): 

512 if ( 

513 python_version 

514 and str(python_version)[0] != str(sys.version_info.major) 

515 and (op_args or op_kwargs) 

516 ): 

517 raise AirflowException( 

518 "Passing op_args or op_kwargs is not supported across different Python " 

519 "major versions for PythonVirtualenvOperator. Please use string_args." 

520 f"Sys version: {sys.version_info}. Venv version: {python_version}" 

521 ) 

522 if not shutil.which("virtualenv"): 

523 raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.") 

524 if not requirements: 

525 self.requirements: list[str] | str = [] 

526 elif isinstance(requirements, str): 

527 self.requirements = requirements 

528 else: 

529 self.requirements = list(requirements) 

530 self.python_version = python_version 

531 self.system_site_packages = system_site_packages 

532 self.pip_install_options = pip_install_options 

533 super().__init__( 

534 python_callable=python_callable, 

535 use_dill=use_dill, 

536 op_args=op_args, 

537 op_kwargs=op_kwargs, 

538 string_args=string_args, 

539 templates_dict=templates_dict, 

540 templates_exts=templates_exts, 

541 expect_airflow=expect_airflow, 

542 skip_on_exit_code=skip_on_exit_code, 

543 **kwargs, 

544 ) 

545 

546 def execute_callable(self): 

547 with TemporaryDirectory(prefix="venv") as tmp_dir: 

548 tmp_path = Path(tmp_dir) 

549 requirements_file_name = f"{tmp_dir}/requirements.txt" 

550 

551 if not isinstance(self.requirements, str): 

552 requirements_file_contents = "\n".join(str(dependency) for dependency in self.requirements) 

553 else: 

554 requirements_file_contents = self.requirements 

555 

556 if not self.system_site_packages and self.use_dill: 

557 requirements_file_contents += "\ndill" 

558 

559 with open(requirements_file_name, "w") as file: 

560 file.write(requirements_file_contents) 

561 prepare_virtualenv( 

562 venv_directory=tmp_dir, 

563 python_bin=f"python{self.python_version}" if self.python_version else None, 

564 system_site_packages=self.system_site_packages, 

565 requirements_file_path=requirements_file_name, 

566 pip_install_options=self.pip_install_options, 

567 ) 

568 python_path = tmp_path / "bin" / "python" 

569 result = self._execute_python_callable_in_subprocess(python_path, tmp_path) 

570 return result 

571 

572 def _iter_serializable_context_keys(self): 

573 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS 

574 if self.system_site_packages or "apache-airflow" in self.requirements: 

575 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS 

576 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

577 elif "pendulum" in self.requirements: 

578 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

579 

580 

581class ExternalPythonOperator(_BasePythonVirtualenvOperator): 

582 """ 

583 Run a function in a virtualenv that is not re-created. 

584 

585 Reused as is without the overhead of creating the virtualenv (with certain caveats). 

586 

587 The function must be defined using def, and not be 

588 part of a class. All imports must happen inside the function 

589 and no variables outside the scope may be referenced. A global scope 

590 variable named virtualenv_string_args will be available (populated by 

591 string_args). In addition, one can pass stuff through op_args and op_kwargs, and one 

592 can use a return value. 

593 Note that if your virtualenv runs in a different Python major version than Airflow, 

594 you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to 

595 Airflow through plugins. You can use string_args though. 

596 

597 If Airflow is installed in the external environment in different version that the version 

598 used by the operator, the operator will fail., 

599 

600 .. seealso:: 

601 For more information on how to use this operator, take a look at the guide: 

602 :ref:`howto/operator:ExternalPythonOperator` 

603 

604 :param python: Full path string (file-system specific) that points to a Python binary inside 

605 a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path 

606 (so usually start with "/" or "X:/" depending on the filesystem/os used). 

607 :param python_callable: A python function with no references to outside variables, 

608 defined with def, which will be run in a virtualenv 

609 :param use_dill: Whether to use dill to serialize 

610 the args and result (pickle is default). This allow more complex types 

611 but if dill is not preinstalled in your venv, the task will fail with use_dill enabled. 

612 :param op_args: A list of positional arguments to pass to python_callable. 

613 :param op_kwargs: A dict of keyword arguments to pass to python_callable. 

614 :param string_args: Strings that are present in the global var virtualenv_string_args, 

615 available to python_callable at runtime as a list[str]. Note that args are split 

616 by newline. 

617 :param templates_dict: a dictionary where the values are templates that 

618 will get templated by the Airflow engine sometime between 

619 ``__init__`` and ``execute`` takes place and are made available 

620 in your callable's context after the template has been applied 

621 :param templates_exts: a list of file extensions to resolve while 

622 processing templated fields, for examples ``['.sql', '.hql']`` 

623 :param expect_airflow: expect Airflow to be installed in the target environment. If true, the operator 

624 will raise warning if Airflow is not installed, and it will attempt to load Airflow 

625 macros when starting. 

626 :param skip_on_exit_code: If python_callable exits with this exit code, leave the task 

627 in ``skipped`` state (default: None). If set to ``None``, any non-zero 

628 exit code will be treated as a failure. 

629 """ 

630 

631 template_fields: Sequence[str] = tuple({"python"} | set(PythonOperator.template_fields)) 

632 

633 def __init__( 

634 self, 

635 *, 

636 python: str, 

637 python_callable: Callable, 

638 use_dill: bool = False, 

639 op_args: Collection[Any] | None = None, 

640 op_kwargs: Mapping[str, Any] | None = None, 

641 string_args: Iterable[str] | None = None, 

642 templates_dict: dict | None = None, 

643 templates_exts: list[str] | None = None, 

644 expect_airflow: bool = True, 

645 expect_pendulum: bool = False, 

646 skip_on_exit_code: int | Container[int] | None = None, 

647 **kwargs, 

648 ): 

649 if not python: 

650 raise ValueError("Python Path must be defined in ExternalPythonOperator") 

651 self.python = python 

652 self.expect_pendulum = expect_pendulum 

653 super().__init__( 

654 python_callable=python_callable, 

655 use_dill=use_dill, 

656 op_args=op_args, 

657 op_kwargs=op_kwargs, 

658 string_args=string_args, 

659 templates_dict=templates_dict, 

660 templates_exts=templates_exts, 

661 expect_airflow=expect_airflow, 

662 skip_on_exit_code=skip_on_exit_code, 

663 **kwargs, 

664 ) 

665 

666 def execute_callable(self): 

667 python_path = Path(self.python) 

668 if not python_path.exists(): 

669 raise ValueError(f"Python Path '{python_path}' must exists") 

670 if not python_path.is_file(): 

671 raise ValueError(f"Python Path '{python_path}' must be a file") 

672 if not python_path.is_absolute(): 

673 raise ValueError(f"Python Path '{python_path}' must be an absolute path.") 

674 python_version_as_list_of_strings = self._get_python_version_from_environment() 

675 if ( 

676 python_version_as_list_of_strings 

677 and str(python_version_as_list_of_strings[0]) != str(sys.version_info.major) 

678 and (self.op_args or self.op_kwargs) 

679 ): 

680 raise AirflowException( 

681 "Passing op_args or op_kwargs is not supported across different Python " 

682 "major versions for ExternalPythonOperator. Please use string_args." 

683 f"Sys version: {sys.version_info}. Venv version: {python_version_as_list_of_strings}" 

684 ) 

685 with TemporaryDirectory(prefix="tmd") as tmp_dir: 

686 tmp_path = Path(tmp_dir) 

687 return self._execute_python_callable_in_subprocess(python_path, tmp_path) 

688 

689 def _get_python_version_from_environment(self) -> list[str]: 

690 try: 

691 result = subprocess.check_output([self.python, "--version"], text=True) 

692 return result.strip().split(" ")[-1].split(".") 

693 except Exception as e: 

694 raise ValueError(f"Error while executing {self.python}: {e}") 

695 

696 def _iter_serializable_context_keys(self): 

697 yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS 

698 if self._get_airflow_version_from_target_env(): 

699 yield from self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS 

700 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

701 elif self._is_pendulum_installed_in_target_env(): 

702 yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS 

703 

704 def _is_pendulum_installed_in_target_env(self) -> bool: 

705 try: 

706 subprocess.check_call([self.python, "-c", "import pendulum"]) 

707 return True 

708 except Exception as e: 

709 if self.expect_pendulum: 

710 self.log.warning("When checking for Pendulum installed in venv got %s", e) 

711 self.log.warning( 

712 "Pendulum is not properly installed in the virtualenv " 

713 "Pendulum context keys will not be available. " 

714 "Please Install Pendulum or Airflow in your venv to access them." 

715 ) 

716 return False 

717 

718 def _get_airflow_version_from_target_env(self) -> str | None: 

719 from airflow import __version__ as airflow_version 

720 

721 try: 

722 result = subprocess.check_output( 

723 [self.python, "-c", "from airflow import __version__; print(__version__)"], text=True 

724 ) 

725 target_airflow_version = result.strip() 

726 if target_airflow_version != airflow_version: 

727 raise AirflowConfigException( 

728 f"The version of Airflow installed for the {self.python}(" 

729 f"{target_airflow_version}) is different than the runtime Airflow version: " 

730 f"{airflow_version}. Make sure your environment has the same Airflow version " 

731 f"installed as the Airflow runtime." 

732 ) 

733 return target_airflow_version 

734 except Exception as e: 

735 if self.expect_airflow: 

736 self.log.warning("When checking for Airflow installed in venv got %s", e) 

737 self.log.warning( 

738 f"This means that Airflow is not properly installed by " 

739 f"{self.python}. Airflow context keys will not be available. " 

740 f"Please Install Airflow {airflow_version} in your environment to access them." 

741 ) 

742 return None 

743 

744 

745def get_current_context() -> Context: 

746 """ 

747 Retrieve the execution context dictionary without altering user method's signature. 

748 This is the simplest method of retrieving the execution context dictionary. 

749 

750 **Old style:** 

751 

752 .. code:: python 

753 

754 def my_task(**context): 

755 ti = context["ti"] 

756 

757 **New style:** 

758 

759 .. code:: python 

760 

761 from airflow.operators.python import get_current_context 

762 

763 

764 def my_task(): 

765 context = get_current_context() 

766 ti = context["ti"] 

767 

768 Current context will only have value if this method was called after an operator 

769 was starting to execute. 

770 """ 

771 if not _CURRENT_CONTEXT: 

772 raise AirflowException( 

773 "Current context was requested but no context was found! " 

774 "Are you running within an airflow task?" 

775 ) 

776 return _CURRENT_CONTEXT[-1]