Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/models/xcom.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

266 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20import inspect 

21import json 

22import logging 

23import pickle 

24import warnings 

25from functools import wraps 

26from typing import TYPE_CHECKING, Any, Iterable, cast, overload 

27 

28from sqlalchemy import ( 

29 Column, 

30 ForeignKeyConstraint, 

31 Index, 

32 Integer, 

33 LargeBinary, 

34 PrimaryKeyConstraint, 

35 String, 

36 delete, 

37 select, 

38 text, 

39) 

40from sqlalchemy.dialects.mysql import LONGBLOB 

41from sqlalchemy.ext.associationproxy import association_proxy 

42from sqlalchemy.orm import Query, reconstructor, relationship 

43from sqlalchemy.orm.exc import NoResultFound 

44 

45from airflow.api_internal.internal_api_call import internal_api_call 

46from airflow.configuration import conf 

47from airflow.exceptions import RemovedInAirflow3Warning 

48from airflow.models.base import COLLATION_ARGS, ID_LEN, TaskInstanceDependencies 

49from airflow.utils import timezone 

50from airflow.utils.db import LazySelectSequence 

51from airflow.utils.helpers import exactly_one, is_container 

52from airflow.utils.json import XComDecoder, XComEncoder 

53from airflow.utils.log.logging_mixin import LoggingMixin 

54from airflow.utils.session import NEW_SESSION, provide_session 

55from airflow.utils.sqlalchemy import UtcDateTime 

56 

57# XCom constants below are needed for providers backward compatibility, 

58# which should import the constants directly after apache-airflow>=2.6.0 

59from airflow.utils.xcom import ( 

60 MAX_XCOM_SIZE, # noqa: F401 

61 XCOM_RETURN_KEY, 

62) 

63 

64log = logging.getLogger(__name__) 

65 

66if TYPE_CHECKING: 

67 import datetime 

68 

69 import pendulum 

70 from sqlalchemy.engine import Row 

71 from sqlalchemy.orm import Session 

72 from sqlalchemy.sql.expression import Select, TextClause 

73 

74 from airflow.models.taskinstancekey import TaskInstanceKey 

75 

76 

77class BaseXCom(TaskInstanceDependencies, LoggingMixin): 

78 """Base class for XCom objects.""" 

79 

80 __tablename__ = "xcom" 

81 

82 dag_run_id = Column(Integer(), nullable=False, primary_key=True) 

83 task_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False, primary_key=True) 

84 map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1")) 

85 key = Column(String(512, **COLLATION_ARGS), nullable=False, primary_key=True) 

86 

87 # Denormalized for easier lookup. 

88 dag_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False) 

89 run_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False) 

90 

91 value = Column(LargeBinary().with_variant(LONGBLOB, "mysql")) 

92 timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False) 

93 

94 __table_args__ = ( 

95 # Ideally we should create a unique index over (key, dag_id, task_id, run_id), 

96 # but it goes over MySQL's index length limit. So we instead index 'key' 

97 # separately, and enforce uniqueness with DagRun.id instead. 

98 Index("idx_xcom_key", key), 

99 Index("idx_xcom_task_instance", dag_id, task_id, run_id, map_index), 

100 PrimaryKeyConstraint("dag_run_id", "task_id", "map_index", "key", name="xcom_pkey"), 

101 ForeignKeyConstraint( 

102 [dag_id, task_id, run_id, map_index], 

103 [ 

104 "task_instance.dag_id", 

105 "task_instance.task_id", 

106 "task_instance.run_id", 

107 "task_instance.map_index", 

108 ], 

109 name="xcom_task_instance_fkey", 

110 ondelete="CASCADE", 

111 ), 

112 ) 

113 

114 dag_run = relationship( 

115 "DagRun", 

116 primaryjoin="BaseXCom.dag_run_id == foreign(DagRun.id)", 

117 uselist=False, 

118 lazy="joined", 

119 passive_deletes="all", 

120 ) 

121 execution_date = association_proxy("dag_run", "execution_date") 

122 

123 @reconstructor 

124 def init_on_load(self): 

125 """ 

126 Execute after the instance has been loaded from the DB or otherwise reconstituted; called by the ORM. 

127 

128 i.e automatically deserialize Xcom value when loading from DB. 

129 """ 

130 self.value = self.orm_deserialize_value() 

131 

132 def __repr__(self): 

133 if self.map_index < 0: 

134 return f'<XCom "{self.key}" ({self.task_id} @ {self.run_id})>' 

135 return f'<XCom "{self.key}" ({self.task_id}[{self.map_index}] @ {self.run_id})>' 

136 

137 @overload 

138 @classmethod 

139 def set( 

140 cls, 

141 key: str, 

142 value: Any, 

143 *, 

144 dag_id: str, 

145 task_id: str, 

146 run_id: str, 

147 map_index: int = -1, 

148 session: Session = NEW_SESSION, 

149 ) -> None: 

150 """Store an XCom value. 

151 

152 A deprecated form of this function accepts ``execution_date`` instead of 

153 ``run_id``. The two arguments are mutually exclusive. 

154 

155 :param key: Key to store the XCom. 

156 :param value: XCom value to store. 

157 :param dag_id: DAG ID. 

158 :param task_id: Task ID. 

159 :param run_id: DAG run ID for the task. 

160 :param map_index: Optional map index to assign XCom for a mapped task. 

161 The default is ``-1`` (set for a non-mapped task). 

162 :param session: Database session. If not given, a new session will be 

163 created for this function. 

164 """ 

165 

166 @overload 

167 @classmethod 

168 def set( 

169 cls, 

170 key: str, 

171 value: Any, 

172 task_id: str, 

173 dag_id: str, 

174 execution_date: datetime.datetime, 

175 session: Session = NEW_SESSION, 

176 ) -> None: 

177 """Store an XCom value. 

178 

179 :sphinx-autoapi-skip: 

180 """ 

181 

182 @classmethod 

183 @provide_session 

184 def set( 

185 cls, 

186 key: str, 

187 value: Any, 

188 task_id: str, 

189 dag_id: str, 

190 execution_date: datetime.datetime | None = None, 

191 session: Session = NEW_SESSION, 

192 *, 

193 run_id: str | None = None, 

194 map_index: int = -1, 

195 ) -> None: 

196 """Store an XCom value. 

197 

198 :sphinx-autoapi-skip: 

199 """ 

200 from airflow.models.dagrun import DagRun 

201 

202 if not exactly_one(execution_date is not None, run_id is not None): 

203 raise ValueError( 

204 f"Exactly one of run_id or execution_date must be passed. " 

205 f"Passed execution_date={execution_date}, run_id={run_id}" 

206 ) 

207 

208 if run_id is None: 

209 message = "Passing 'execution_date' to 'XCom.set()' is deprecated. Use 'run_id' instead." 

210 warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3) 

211 try: 

212 dag_run_id, run_id = ( 

213 session.query(DagRun.id, DagRun.run_id) 

214 .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date) 

215 .one() 

216 ) 

217 except NoResultFound: 

218 raise ValueError(f"DAG run not found on DAG {dag_id!r} at {execution_date}") from None 

219 else: 

220 dag_run_id = session.query(DagRun.id).filter_by(dag_id=dag_id, run_id=run_id).scalar() 

221 if dag_run_id is None: 

222 raise ValueError(f"DAG run not found on DAG {dag_id!r} with ID {run_id!r}") 

223 

224 # Seamlessly resolve LazySelectSequence to a list. This intends to work 

225 # as a "lazy list" to avoid pulling a ton of XComs unnecessarily, but if 

226 # it's pushed into XCom, the user should be aware of the performance 

227 # implications, and this avoids leaking the implementation detail. 

228 if isinstance(value, LazySelectSequence): 

229 warning_message = ( 

230 "Coercing mapped lazy proxy %s from task %s (DAG %s, run %s) " 

231 "to list, which may degrade performance. Review resource " 

232 "requirements for this operation, and call list() to suppress " 

233 "this message. See Dynamic Task Mapping documentation for " 

234 "more information about lazy proxy objects." 

235 ) 

236 log.warning( 

237 warning_message, 

238 "return value" if key == XCOM_RETURN_KEY else f"value {key}", 

239 task_id, 

240 dag_id, 

241 run_id or execution_date, 

242 ) 

243 value = list(value) 

244 

245 value = cls.serialize_value( 

246 value=value, 

247 key=key, 

248 task_id=task_id, 

249 dag_id=dag_id, 

250 run_id=run_id, 

251 map_index=map_index, 

252 ) 

253 

254 # Remove duplicate XComs and insert a new one. 

255 session.execute( 

256 delete(cls).where( 

257 cls.key == key, 

258 cls.run_id == run_id, 

259 cls.task_id == task_id, 

260 cls.dag_id == dag_id, 

261 cls.map_index == map_index, 

262 ) 

263 ) 

264 new = cast(Any, cls)( # Work around Mypy complaining model not defining '__init__'. 

265 dag_run_id=dag_run_id, 

266 key=key, 

267 value=value, 

268 run_id=run_id, 

269 task_id=task_id, 

270 dag_id=dag_id, 

271 map_index=map_index, 

272 ) 

273 session.add(new) 

274 session.flush() 

275 

276 @staticmethod 

277 @provide_session 

278 @internal_api_call 

279 def get_value( 

280 *, 

281 ti_key: TaskInstanceKey, 

282 key: str | None = None, 

283 session: Session = NEW_SESSION, 

284 ) -> Any: 

285 """Retrieve an XCom value for a task instance. 

286 

287 This method returns "full" XCom values (i.e. uses ``deserialize_value`` 

288 from the XCom backend). Use :meth:`get_many` if you want the "shortened" 

289 value via ``orm_deserialize_value``. 

290 

291 If there are no results, *None* is returned. If multiple XCom entries 

292 match the criteria, an arbitrary one is returned. 

293 

294 :param ti_key: The TaskInstanceKey to look up the XCom for. 

295 :param key: A key for the XCom. If provided, only XCom with matching 

296 keys will be returned. Pass *None* (default) to remove the filter. 

297 :param session: Database session. If not given, a new session will be 

298 created for this function. 

299 """ 

300 return BaseXCom.get_one( 

301 key=key, 

302 task_id=ti_key.task_id, 

303 dag_id=ti_key.dag_id, 

304 run_id=ti_key.run_id, 

305 map_index=ti_key.map_index, 

306 session=session, 

307 ) 

308 

309 @overload 

310 @staticmethod 

311 @internal_api_call 

312 def get_one( 

313 *, 

314 key: str | None = None, 

315 dag_id: str | None = None, 

316 task_id: str | None = None, 

317 run_id: str | None = None, 

318 map_index: int | None = None, 

319 session: Session = NEW_SESSION, 

320 ) -> Any | None: 

321 """Retrieve an XCom value, optionally meeting certain criteria. 

322 

323 This method returns "full" XCom values (i.e. uses ``deserialize_value`` 

324 from the XCom backend). Use :meth:`get_many` if you want the "shortened" 

325 value via ``orm_deserialize_value``. 

326 

327 If there are no results, *None* is returned. If multiple XCom entries 

328 match the criteria, an arbitrary one is returned. 

329 

330 A deprecated form of this function accepts ``execution_date`` instead of 

331 ``run_id``. The two arguments are mutually exclusive. 

332 

333 .. seealso:: ``get_value()`` is a convenience function if you already 

334 have a structured TaskInstance or TaskInstanceKey object available. 

335 

336 :param run_id: DAG run ID for the task. 

337 :param dag_id: Only pull XCom from this DAG. Pass *None* (default) to 

338 remove the filter. 

339 :param task_id: Only XCom from task with matching ID will be pulled. 

340 Pass *None* (default) to remove the filter. 

341 :param map_index: Only XCom from task with matching ID will be pulled. 

342 Pass *None* (default) to remove the filter. 

343 :param key: A key for the XCom. If provided, only XCom with matching 

344 keys will be returned. Pass *None* (default) to remove the filter. 

345 :param include_prior_dates: If *False* (default), only XCom from the 

346 specified DAG run is returned. If *True*, the latest matching XCom is 

347 returned regardless of the run it belongs to. 

348 :param session: Database session. If not given, a new session will be 

349 created for this function. 

350 """ 

351 

352 @overload 

353 @staticmethod 

354 @internal_api_call 

355 def get_one( 

356 execution_date: datetime.datetime, 

357 key: str | None = None, 

358 task_id: str | None = None, 

359 dag_id: str | None = None, 

360 include_prior_dates: bool = False, 

361 session: Session = NEW_SESSION, 

362 ) -> Any | None: 

363 """Retrieve an XCom value, optionally meeting certain criteria. 

364 

365 :sphinx-autoapi-skip: 

366 """ 

367 

368 @staticmethod 

369 @provide_session 

370 @internal_api_call 

371 def get_one( 

372 execution_date: datetime.datetime | None = None, 

373 key: str | None = None, 

374 task_id: str | None = None, 

375 dag_id: str | None = None, 

376 include_prior_dates: bool = False, 

377 session: Session = NEW_SESSION, 

378 *, 

379 run_id: str | None = None, 

380 map_index: int | None = None, 

381 ) -> Any | None: 

382 """Retrieve an XCom value, optionally meeting certain criteria. 

383 

384 :sphinx-autoapi-skip: 

385 """ 

386 if not exactly_one(execution_date is not None, run_id is not None): 

387 raise ValueError("Exactly one of run_id or execution_date must be passed") 

388 

389 if run_id: 

390 query = BaseXCom.get_many( 

391 run_id=run_id, 

392 key=key, 

393 task_ids=task_id, 

394 dag_ids=dag_id, 

395 map_indexes=map_index, 

396 include_prior_dates=include_prior_dates, 

397 limit=1, 

398 session=session, 

399 ) 

400 elif execution_date is not None: 

401 message = "Passing 'execution_date' to 'XCom.get_one()' is deprecated. Use 'run_id' instead." 

402 warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3) 

403 

404 with warnings.catch_warnings(): 

405 warnings.simplefilter("ignore", RemovedInAirflow3Warning) 

406 query = BaseXCom.get_many( 

407 execution_date=execution_date, 

408 key=key, 

409 task_ids=task_id, 

410 dag_ids=dag_id, 

411 map_indexes=map_index, 

412 include_prior_dates=include_prior_dates, 

413 limit=1, 

414 session=session, 

415 ) 

416 else: 

417 raise RuntimeError("Should not happen?") 

418 

419 result = query.with_entities(BaseXCom.value).first() 

420 if result: 

421 return XCom.deserialize_value(result) 

422 return None 

423 

424 @overload 

425 @staticmethod 

426 def get_many( 

427 *, 

428 run_id: str, 

429 key: str | None = None, 

430 task_ids: str | Iterable[str] | None = None, 

431 dag_ids: str | Iterable[str] | None = None, 

432 map_indexes: int | Iterable[int] | None = None, 

433 include_prior_dates: bool = False, 

434 limit: int | None = None, 

435 session: Session = NEW_SESSION, 

436 ) -> Query: 

437 """Composes a query to get one or more XCom entries. 

438 

439 This function returns an SQLAlchemy query of full XCom objects. If you 

440 just want one stored value, use :meth:`get_one` instead. 

441 

442 A deprecated form of this function accepts ``execution_date`` instead of 

443 ``run_id``. The two arguments are mutually exclusive. 

444 

445 :param run_id: DAG run ID for the task. 

446 :param key: A key for the XComs. If provided, only XComs with matching 

447 keys will be returned. Pass *None* (default) to remove the filter. 

448 :param task_ids: Only XComs from task with matching IDs will be pulled. 

449 Pass *None* (default) to remove the filter. 

450 :param dag_ids: Only pulls XComs from specified DAGs. Pass *None* 

451 (default) to remove the filter. 

452 :param map_indexes: Only XComs from matching map indexes will be pulled. 

453 Pass *None* (default) to remove the filter. 

454 :param include_prior_dates: If *False* (default), only XComs from the 

455 specified DAG run are returned. If *True*, all matching XComs are 

456 returned regardless of the run it belongs to. 

457 :param session: Database session. If not given, a new session will be 

458 created for this function. 

459 :param limit: Limiting returning XComs 

460 """ 

461 

462 @overload 

463 @staticmethod 

464 @internal_api_call 

465 def get_many( 

466 execution_date: datetime.datetime, 

467 key: str | None = None, 

468 task_ids: str | Iterable[str] | None = None, 

469 dag_ids: str | Iterable[str] | None = None, 

470 map_indexes: int | Iterable[int] | None = None, 

471 include_prior_dates: bool = False, 

472 limit: int | None = None, 

473 session: Session = NEW_SESSION, 

474 ) -> Query: 

475 """Composes a query to get one or more XCom entries. 

476 

477 :sphinx-autoapi-skip: 

478 """ 

479 

480 @staticmethod 

481 @provide_session 

482 @internal_api_call 

483 def get_many( 

484 execution_date: datetime.datetime | None = None, 

485 key: str | None = None, 

486 task_ids: str | Iterable[str] | None = None, 

487 dag_ids: str | Iterable[str] | None = None, 

488 map_indexes: int | Iterable[int] | None = None, 

489 include_prior_dates: bool = False, 

490 limit: int | None = None, 

491 session: Session = NEW_SESSION, 

492 *, 

493 run_id: str | None = None, 

494 ) -> Query: 

495 """Composes a query to get one or more XCom entries. 

496 

497 :sphinx-autoapi-skip: 

498 """ 

499 from airflow.models.dagrun import DagRun 

500 

501 if not exactly_one(execution_date is not None, run_id is not None): 

502 raise ValueError( 

503 f"Exactly one of run_id or execution_date must be passed. " 

504 f"Passed execution_date={execution_date}, run_id={run_id}" 

505 ) 

506 if execution_date is not None: 

507 message = "Passing 'execution_date' to 'XCom.get_many()' is deprecated. Use 'run_id' instead." 

508 warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3) 

509 

510 query = session.query(BaseXCom).join(BaseXCom.dag_run) 

511 

512 if key: 

513 query = query.filter(BaseXCom.key == key) 

514 

515 if is_container(task_ids): 

516 query = query.filter(BaseXCom.task_id.in_(task_ids)) 

517 elif task_ids is not None: 

518 query = query.filter(BaseXCom.task_id == task_ids) 

519 

520 if is_container(dag_ids): 

521 query = query.filter(BaseXCom.dag_id.in_(dag_ids)) 

522 elif dag_ids is not None: 

523 query = query.filter(BaseXCom.dag_id == dag_ids) 

524 

525 if isinstance(map_indexes, range) and map_indexes.step == 1: 

526 query = query.filter( 

527 BaseXCom.map_index >= map_indexes.start, BaseXCom.map_index < map_indexes.stop 

528 ) 

529 elif is_container(map_indexes): 

530 query = query.filter(BaseXCom.map_index.in_(map_indexes)) 

531 elif map_indexes is not None: 

532 query = query.filter(BaseXCom.map_index == map_indexes) 

533 

534 if include_prior_dates: 

535 if execution_date is not None: 

536 query = query.filter(DagRun.execution_date <= execution_date) 

537 else: 

538 dr = session.query(DagRun.execution_date).filter(DagRun.run_id == run_id).subquery() 

539 query = query.filter(BaseXCom.execution_date <= dr.c.execution_date) 

540 elif execution_date is not None: 

541 query = query.filter(DagRun.execution_date == execution_date) 

542 else: 

543 query = query.filter(BaseXCom.run_id == run_id) 

544 

545 query = query.order_by(DagRun.execution_date.desc(), BaseXCom.timestamp.desc()) 

546 if limit: 

547 return query.limit(limit) 

548 return query 

549 

550 @classmethod 

551 @provide_session 

552 def delete(cls, xcoms: XCom | Iterable[XCom], session: Session) -> None: 

553 """Delete one or multiple XCom entries.""" 

554 if isinstance(xcoms, XCom): 

555 xcoms = [xcoms] 

556 for xcom in xcoms: 

557 if not isinstance(xcom, XCom): 

558 raise TypeError(f"Expected XCom; received {xcom.__class__.__name__}") 

559 XCom.purge(xcom, session) 

560 session.delete(xcom) 

561 session.commit() 

562 

563 @staticmethod 

564 def purge(xcom: XCom, session: Session) -> None: 

565 """Purge an XCom entry from underlying storage implementations.""" 

566 pass 

567 

568 @overload 

569 @staticmethod 

570 @internal_api_call 

571 def clear( 

572 *, 

573 dag_id: str, 

574 task_id: str, 

575 run_id: str, 

576 map_index: int | None = None, 

577 session: Session = NEW_SESSION, 

578 ) -> None: 

579 """Clear all XCom data from the database for the given task instance. 

580 

581 A deprecated form of this function accepts ``execution_date`` instead of 

582 ``run_id``. The two arguments are mutually exclusive. 

583 

584 :param dag_id: ID of DAG to clear the XCom for. 

585 :param task_id: ID of task to clear the XCom for. 

586 :param run_id: ID of DAG run to clear the XCom for. 

587 :param map_index: If given, only clear XCom from this particular mapped 

588 task. The default ``None`` clears *all* XComs from the task. 

589 :param session: Database session. If not given, a new session will be 

590 created for this function. 

591 """ 

592 

593 @overload 

594 @staticmethod 

595 @internal_api_call 

596 def clear( 

597 execution_date: pendulum.DateTime, 

598 dag_id: str, 

599 task_id: str, 

600 session: Session = NEW_SESSION, 

601 ) -> None: 

602 """Clear all XCom data from the database for the given task instance. 

603 

604 :sphinx-autoapi-skip: 

605 """ 

606 

607 @staticmethod 

608 @provide_session 

609 @internal_api_call 

610 def clear( 

611 execution_date: pendulum.DateTime | None = None, 

612 dag_id: str | None = None, 

613 task_id: str | None = None, 

614 session: Session = NEW_SESSION, 

615 *, 

616 run_id: str | None = None, 

617 map_index: int | None = None, 

618 ) -> None: 

619 """Clear all XCom data from the database for the given task instance. 

620 

621 :sphinx-autoapi-skip: 

622 """ 

623 from airflow.models import DagRun 

624 

625 # Given the historic order of this function (execution_date was first argument) to add a new optional 

626 # param we need to add default values for everything :( 

627 if dag_id is None: 

628 raise TypeError("clear() missing required argument: dag_id") 

629 if task_id is None: 

630 raise TypeError("clear() missing required argument: task_id") 

631 

632 if not exactly_one(execution_date is not None, run_id is not None): 

633 raise ValueError( 

634 f"Exactly one of run_id or execution_date must be passed. " 

635 f"Passed execution_date={execution_date}, run_id={run_id}" 

636 ) 

637 

638 if execution_date is not None: 

639 message = "Passing 'execution_date' to 'XCom.clear()' is deprecated. Use 'run_id' instead." 

640 warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3) 

641 run_id = ( 

642 session.query(DagRun.run_id) 

643 .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date) 

644 .scalar() 

645 ) 

646 

647 query = session.query(BaseXCom).filter_by(dag_id=dag_id, task_id=task_id, run_id=run_id) 

648 if map_index is not None: 

649 query = query.filter_by(map_index=map_index) 

650 

651 for xcom in query: 

652 # print(f"Clearing XCOM {xcom} with value {xcom.value}") 

653 XCom.purge(xcom, session) 

654 session.delete(xcom) 

655 

656 session.commit() 

657 

658 @staticmethod 

659 def serialize_value( 

660 value: Any, 

661 *, 

662 key: str | None = None, 

663 task_id: str | None = None, 

664 dag_id: str | None = None, 

665 run_id: str | None = None, 

666 map_index: int | None = None, 

667 ) -> Any: 

668 """Serialize XCom value to str or pickled object.""" 

669 if conf.getboolean("core", "enable_xcom_pickling"): 

670 return pickle.dumps(value) 

671 try: 

672 return json.dumps(value, cls=XComEncoder).encode("UTF-8") 

673 except (ValueError, TypeError) as ex: 

674 log.error( 

675 "%s." 

676 " If you are using pickle instead of JSON for XCom," 

677 " then you need to enable pickle support for XCom" 

678 " in your airflow config or make sure to decorate your" 

679 " object with attr.", 

680 ex, 

681 ) 

682 raise 

683 

684 @staticmethod 

685 def _deserialize_value(result: XCom, orm: bool) -> Any: 

686 object_hook = None 

687 if orm: 

688 object_hook = XComDecoder.orm_object_hook 

689 

690 if result.value is None: 

691 return None 

692 if conf.getboolean("core", "enable_xcom_pickling"): 

693 try: 

694 return pickle.loads(result.value) 

695 except pickle.UnpicklingError: 

696 return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook) 

697 else: 

698 # Since xcom_pickling is disabled, we should only try to deserialize with JSON 

699 return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook) 

700 

701 @staticmethod 

702 def deserialize_value(result: XCom) -> Any: 

703 """Deserialize XCom value from str or pickle object.""" 

704 return BaseXCom._deserialize_value(result, False) 

705 

706 def orm_deserialize_value(self) -> Any: 

707 """ 

708 Deserialize method which is used to reconstruct ORM XCom object. 

709 

710 This method should be overridden in custom XCom backends to avoid 

711 unnecessary request or other resource consuming operations when 

712 creating XCom orm model. This is used when viewing XCom listing 

713 in the webserver, for example. 

714 """ 

715 return BaseXCom._deserialize_value(self, True) 

716 

717 

718class LazyXComSelectSequence(LazySelectSequence[Any]): 

719 """List-like interface to lazily access XCom values. 

720 

721 :meta private: 

722 """ 

723 

724 @staticmethod 

725 def _rebuild_select(stmt: TextClause) -> Select: 

726 return select(XCom.value).from_statement(stmt) 

727 

728 @staticmethod 

729 def _process_row(row: Row) -> Any: 

730 return XCom.deserialize_value(row) 

731 

732 

733def _patch_outdated_serializer(clazz: type[BaseXCom], params: Iterable[str]) -> None: 

734 """Patch a custom ``serialize_value`` to accept the modern signature. 

735 

736 To give custom XCom backends more flexibility with how they store values, we 

737 now forward all params passed to ``XCom.set`` to ``XCom.serialize_value``. 

738 In order to maintain compatibility with custom XCom backends written with 

739 the old signature, we check the signature and, if necessary, patch with a 

740 method that ignores kwargs the backend does not accept. 

741 """ 

742 old_serializer = clazz.serialize_value 

743 

744 @wraps(old_serializer) 

745 def _shim(**kwargs): 

746 kwargs = {k: kwargs.get(k) for k in params} 

747 warnings.warn( 

748 f"Method `serialize_value` in XCom backend {XCom.__name__} is using outdated signature and" 

749 f"must be updated to accept all params in `BaseXCom.set` except `session`. Support will be " 

750 f"removed in a future release.", 

751 RemovedInAirflow3Warning, 

752 stacklevel=1, 

753 ) 

754 return old_serializer(**kwargs) 

755 

756 clazz.serialize_value = _shim # type: ignore[assignment] 

757 

758 

759def _get_function_params(function) -> list[str]: 

760 """ 

761 Return the list of variables names of a function. 

762 

763 :param function: The function to inspect 

764 """ 

765 parameters = inspect.signature(function).parameters 

766 bound_arguments = [ 

767 name for name, p in parameters.items() if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD) 

768 ] 

769 return bound_arguments 

770 

771 

772def resolve_xcom_backend() -> type[BaseXCom]: 

773 """Resolve custom XCom class. 

774 

775 Confirm that custom XCom class extends the BaseXCom. 

776 Compare the function signature of the custom XCom serialize_value to the base XCom serialize_value. 

777 """ 

778 clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}") 

779 if not clazz: 

780 return BaseXCom 

781 if not issubclass(clazz, BaseXCom): 

782 raise TypeError( 

783 f"Your custom XCom class `{clazz.__name__}` is not a subclass of `{BaseXCom.__name__}`." 

784 ) 

785 base_xcom_params = _get_function_params(BaseXCom.serialize_value) 

786 xcom_params = _get_function_params(clazz.serialize_value) 

787 if set(base_xcom_params) != set(xcom_params): 

788 _patch_outdated_serializer(clazz=clazz, params=xcom_params) 

789 return clazz 

790 

791 

792if TYPE_CHECKING: 

793 XCom = BaseXCom # Hack to avoid Mypy "Variable 'XCom' is not valid as a type". 

794else: 

795 XCom = resolve_xcom_backend()