Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/models/xcom.py: 46%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import inspect
21import json
22import logging
23import pickle
24import warnings
25from functools import wraps
26from typing import TYPE_CHECKING, Any, Iterable, cast, overload
28from sqlalchemy import (
29 Column,
30 ForeignKeyConstraint,
31 Index,
32 Integer,
33 LargeBinary,
34 PrimaryKeyConstraint,
35 String,
36 delete,
37 select,
38 text,
39)
40from sqlalchemy.dialects.mysql import LONGBLOB
41from sqlalchemy.ext.associationproxy import association_proxy
42from sqlalchemy.orm import Query, reconstructor, relationship
43from sqlalchemy.orm.exc import NoResultFound
45from airflow.api_internal.internal_api_call import internal_api_call
46from airflow.configuration import conf
47from airflow.exceptions import RemovedInAirflow3Warning
48from airflow.models.base import COLLATION_ARGS, ID_LEN, TaskInstanceDependencies
49from airflow.utils import timezone
50from airflow.utils.db import LazySelectSequence
51from airflow.utils.helpers import exactly_one, is_container
52from airflow.utils.json import XComDecoder, XComEncoder
53from airflow.utils.log.logging_mixin import LoggingMixin
54from airflow.utils.session import NEW_SESSION, provide_session
55from airflow.utils.sqlalchemy import UtcDateTime
57# XCom constants below are needed for providers backward compatibility,
58# which should import the constants directly after apache-airflow>=2.6.0
59from airflow.utils.xcom import (
60 MAX_XCOM_SIZE, # noqa: F401
61 XCOM_RETURN_KEY,
62)
64log = logging.getLogger(__name__)
66if TYPE_CHECKING:
67 import datetime
69 import pendulum
70 from sqlalchemy.engine import Row
71 from sqlalchemy.orm import Session
72 from sqlalchemy.sql.expression import Select, TextClause
74 from airflow.models.taskinstancekey import TaskInstanceKey
77class BaseXCom(TaskInstanceDependencies, LoggingMixin):
78 """Base class for XCom objects."""
80 __tablename__ = "xcom"
82 dag_run_id = Column(Integer(), nullable=False, primary_key=True)
83 task_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False, primary_key=True)
84 map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))
85 key = Column(String(512, **COLLATION_ARGS), nullable=False, primary_key=True)
87 # Denormalized for easier lookup.
88 dag_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
89 run_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
91 value = Column(LargeBinary().with_variant(LONGBLOB, "mysql"))
92 timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
94 __table_args__ = (
95 # Ideally we should create a unique index over (key, dag_id, task_id, run_id),
96 # but it goes over MySQL's index length limit. So we instead index 'key'
97 # separately, and enforce uniqueness with DagRun.id instead.
98 Index("idx_xcom_key", key),
99 Index("idx_xcom_task_instance", dag_id, task_id, run_id, map_index),
100 PrimaryKeyConstraint("dag_run_id", "task_id", "map_index", "key", name="xcom_pkey"),
101 ForeignKeyConstraint(
102 [dag_id, task_id, run_id, map_index],
103 [
104 "task_instance.dag_id",
105 "task_instance.task_id",
106 "task_instance.run_id",
107 "task_instance.map_index",
108 ],
109 name="xcom_task_instance_fkey",
110 ondelete="CASCADE",
111 ),
112 )
114 dag_run = relationship(
115 "DagRun",
116 primaryjoin="BaseXCom.dag_run_id == foreign(DagRun.id)",
117 uselist=False,
118 lazy="joined",
119 passive_deletes="all",
120 )
121 execution_date = association_proxy("dag_run", "execution_date")
123 @reconstructor
124 def init_on_load(self):
125 """
126 Execute after the instance has been loaded from the DB or otherwise reconstituted; called by the ORM.
128 i.e automatically deserialize Xcom value when loading from DB.
129 """
130 self.value = self.orm_deserialize_value()
132 def __repr__(self):
133 if self.map_index < 0:
134 return f'<XCom "{self.key}" ({self.task_id} @ {self.run_id})>'
135 return f'<XCom "{self.key}" ({self.task_id}[{self.map_index}] @ {self.run_id})>'
137 @overload
138 @classmethod
139 def set(
140 cls,
141 key: str,
142 value: Any,
143 *,
144 dag_id: str,
145 task_id: str,
146 run_id: str,
147 map_index: int = -1,
148 session: Session = NEW_SESSION,
149 ) -> None:
150 """Store an XCom value.
152 A deprecated form of this function accepts ``execution_date`` instead of
153 ``run_id``. The two arguments are mutually exclusive.
155 :param key: Key to store the XCom.
156 :param value: XCom value to store.
157 :param dag_id: DAG ID.
158 :param task_id: Task ID.
159 :param run_id: DAG run ID for the task.
160 :param map_index: Optional map index to assign XCom for a mapped task.
161 The default is ``-1`` (set for a non-mapped task).
162 :param session: Database session. If not given, a new session will be
163 created for this function.
164 """
166 @overload
167 @classmethod
168 def set(
169 cls,
170 key: str,
171 value: Any,
172 task_id: str,
173 dag_id: str,
174 execution_date: datetime.datetime,
175 session: Session = NEW_SESSION,
176 ) -> None:
177 """Store an XCom value.
179 :sphinx-autoapi-skip:
180 """
182 @classmethod
183 @provide_session
184 def set(
185 cls,
186 key: str,
187 value: Any,
188 task_id: str,
189 dag_id: str,
190 execution_date: datetime.datetime | None = None,
191 session: Session = NEW_SESSION,
192 *,
193 run_id: str | None = None,
194 map_index: int = -1,
195 ) -> None:
196 """Store an XCom value.
198 :sphinx-autoapi-skip:
199 """
200 from airflow.models.dagrun import DagRun
202 if not exactly_one(execution_date is not None, run_id is not None):
203 raise ValueError(
204 f"Exactly one of run_id or execution_date must be passed. "
205 f"Passed execution_date={execution_date}, run_id={run_id}"
206 )
208 if run_id is None:
209 message = "Passing 'execution_date' to 'XCom.set()' is deprecated. Use 'run_id' instead."
210 warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3)
211 try:
212 dag_run_id, run_id = (
213 session.query(DagRun.id, DagRun.run_id)
214 .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
215 .one()
216 )
217 except NoResultFound:
218 raise ValueError(f"DAG run not found on DAG {dag_id!r} at {execution_date}") from None
219 else:
220 dag_run_id = session.query(DagRun.id).filter_by(dag_id=dag_id, run_id=run_id).scalar()
221 if dag_run_id is None:
222 raise ValueError(f"DAG run not found on DAG {dag_id!r} with ID {run_id!r}")
224 # Seamlessly resolve LazySelectSequence to a list. This intends to work
225 # as a "lazy list" to avoid pulling a ton of XComs unnecessarily, but if
226 # it's pushed into XCom, the user should be aware of the performance
227 # implications, and this avoids leaking the implementation detail.
228 if isinstance(value, LazySelectSequence):
229 warning_message = (
230 "Coercing mapped lazy proxy %s from task %s (DAG %s, run %s) "
231 "to list, which may degrade performance. Review resource "
232 "requirements for this operation, and call list() to suppress "
233 "this message. See Dynamic Task Mapping documentation for "
234 "more information about lazy proxy objects."
235 )
236 log.warning(
237 warning_message,
238 "return value" if key == XCOM_RETURN_KEY else f"value {key}",
239 task_id,
240 dag_id,
241 run_id or execution_date,
242 )
243 value = list(value)
245 value = cls.serialize_value(
246 value=value,
247 key=key,
248 task_id=task_id,
249 dag_id=dag_id,
250 run_id=run_id,
251 map_index=map_index,
252 )
254 # Remove duplicate XComs and insert a new one.
255 session.execute(
256 delete(cls).where(
257 cls.key == key,
258 cls.run_id == run_id,
259 cls.task_id == task_id,
260 cls.dag_id == dag_id,
261 cls.map_index == map_index,
262 )
263 )
264 new = cast(Any, cls)( # Work around Mypy complaining model not defining '__init__'.
265 dag_run_id=dag_run_id,
266 key=key,
267 value=value,
268 run_id=run_id,
269 task_id=task_id,
270 dag_id=dag_id,
271 map_index=map_index,
272 )
273 session.add(new)
274 session.flush()
276 @staticmethod
277 @provide_session
278 @internal_api_call
279 def get_value(
280 *,
281 ti_key: TaskInstanceKey,
282 key: str | None = None,
283 session: Session = NEW_SESSION,
284 ) -> Any:
285 """Retrieve an XCom value for a task instance.
287 This method returns "full" XCom values (i.e. uses ``deserialize_value``
288 from the XCom backend). Use :meth:`get_many` if you want the "shortened"
289 value via ``orm_deserialize_value``.
291 If there are no results, *None* is returned. If multiple XCom entries
292 match the criteria, an arbitrary one is returned.
294 :param ti_key: The TaskInstanceKey to look up the XCom for.
295 :param key: A key for the XCom. If provided, only XCom with matching
296 keys will be returned. Pass *None* (default) to remove the filter.
297 :param session: Database session. If not given, a new session will be
298 created for this function.
299 """
300 return BaseXCom.get_one(
301 key=key,
302 task_id=ti_key.task_id,
303 dag_id=ti_key.dag_id,
304 run_id=ti_key.run_id,
305 map_index=ti_key.map_index,
306 session=session,
307 )
309 @overload
310 @staticmethod
311 @internal_api_call
312 def get_one(
313 *,
314 key: str | None = None,
315 dag_id: str | None = None,
316 task_id: str | None = None,
317 run_id: str | None = None,
318 map_index: int | None = None,
319 session: Session = NEW_SESSION,
320 ) -> Any | None:
321 """Retrieve an XCom value, optionally meeting certain criteria.
323 This method returns "full" XCom values (i.e. uses ``deserialize_value``
324 from the XCom backend). Use :meth:`get_many` if you want the "shortened"
325 value via ``orm_deserialize_value``.
327 If there are no results, *None* is returned. If multiple XCom entries
328 match the criteria, an arbitrary one is returned.
330 A deprecated form of this function accepts ``execution_date`` instead of
331 ``run_id``. The two arguments are mutually exclusive.
333 .. seealso:: ``get_value()`` is a convenience function if you already
334 have a structured TaskInstance or TaskInstanceKey object available.
336 :param run_id: DAG run ID for the task.
337 :param dag_id: Only pull XCom from this DAG. Pass *None* (default) to
338 remove the filter.
339 :param task_id: Only XCom from task with matching ID will be pulled.
340 Pass *None* (default) to remove the filter.
341 :param map_index: Only XCom from task with matching ID will be pulled.
342 Pass *None* (default) to remove the filter.
343 :param key: A key for the XCom. If provided, only XCom with matching
344 keys will be returned. Pass *None* (default) to remove the filter.
345 :param include_prior_dates: If *False* (default), only XCom from the
346 specified DAG run is returned. If *True*, the latest matching XCom is
347 returned regardless of the run it belongs to.
348 :param session: Database session. If not given, a new session will be
349 created for this function.
350 """
352 @overload
353 @staticmethod
354 @internal_api_call
355 def get_one(
356 execution_date: datetime.datetime,
357 key: str | None = None,
358 task_id: str | None = None,
359 dag_id: str | None = None,
360 include_prior_dates: bool = False,
361 session: Session = NEW_SESSION,
362 ) -> Any | None:
363 """Retrieve an XCom value, optionally meeting certain criteria.
365 :sphinx-autoapi-skip:
366 """
368 @staticmethod
369 @provide_session
370 @internal_api_call
371 def get_one(
372 execution_date: datetime.datetime | None = None,
373 key: str | None = None,
374 task_id: str | None = None,
375 dag_id: str | None = None,
376 include_prior_dates: bool = False,
377 session: Session = NEW_SESSION,
378 *,
379 run_id: str | None = None,
380 map_index: int | None = None,
381 ) -> Any | None:
382 """Retrieve an XCom value, optionally meeting certain criteria.
384 :sphinx-autoapi-skip:
385 """
386 if not exactly_one(execution_date is not None, run_id is not None):
387 raise ValueError("Exactly one of run_id or execution_date must be passed")
389 if run_id:
390 query = BaseXCom.get_many(
391 run_id=run_id,
392 key=key,
393 task_ids=task_id,
394 dag_ids=dag_id,
395 map_indexes=map_index,
396 include_prior_dates=include_prior_dates,
397 limit=1,
398 session=session,
399 )
400 elif execution_date is not None:
401 message = "Passing 'execution_date' to 'XCom.get_one()' is deprecated. Use 'run_id' instead."
402 warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3)
404 with warnings.catch_warnings():
405 warnings.simplefilter("ignore", RemovedInAirflow3Warning)
406 query = BaseXCom.get_many(
407 execution_date=execution_date,
408 key=key,
409 task_ids=task_id,
410 dag_ids=dag_id,
411 map_indexes=map_index,
412 include_prior_dates=include_prior_dates,
413 limit=1,
414 session=session,
415 )
416 else:
417 raise RuntimeError("Should not happen?")
419 result = query.with_entities(BaseXCom.value).first()
420 if result:
421 return XCom.deserialize_value(result)
422 return None
424 @overload
425 @staticmethod
426 def get_many(
427 *,
428 run_id: str,
429 key: str | None = None,
430 task_ids: str | Iterable[str] | None = None,
431 dag_ids: str | Iterable[str] | None = None,
432 map_indexes: int | Iterable[int] | None = None,
433 include_prior_dates: bool = False,
434 limit: int | None = None,
435 session: Session = NEW_SESSION,
436 ) -> Query:
437 """Composes a query to get one or more XCom entries.
439 This function returns an SQLAlchemy query of full XCom objects. If you
440 just want one stored value, use :meth:`get_one` instead.
442 A deprecated form of this function accepts ``execution_date`` instead of
443 ``run_id``. The two arguments are mutually exclusive.
445 :param run_id: DAG run ID for the task.
446 :param key: A key for the XComs. If provided, only XComs with matching
447 keys will be returned. Pass *None* (default) to remove the filter.
448 :param task_ids: Only XComs from task with matching IDs will be pulled.
449 Pass *None* (default) to remove the filter.
450 :param dag_ids: Only pulls XComs from specified DAGs. Pass *None*
451 (default) to remove the filter.
452 :param map_indexes: Only XComs from matching map indexes will be pulled.
453 Pass *None* (default) to remove the filter.
454 :param include_prior_dates: If *False* (default), only XComs from the
455 specified DAG run are returned. If *True*, all matching XComs are
456 returned regardless of the run it belongs to.
457 :param session: Database session. If not given, a new session will be
458 created for this function.
459 :param limit: Limiting returning XComs
460 """
462 @overload
463 @staticmethod
464 @internal_api_call
465 def get_many(
466 execution_date: datetime.datetime,
467 key: str | None = None,
468 task_ids: str | Iterable[str] | None = None,
469 dag_ids: str | Iterable[str] | None = None,
470 map_indexes: int | Iterable[int] | None = None,
471 include_prior_dates: bool = False,
472 limit: int | None = None,
473 session: Session = NEW_SESSION,
474 ) -> Query:
475 """Composes a query to get one or more XCom entries.
477 :sphinx-autoapi-skip:
478 """
480 @staticmethod
481 @provide_session
482 @internal_api_call
483 def get_many(
484 execution_date: datetime.datetime | None = None,
485 key: str | None = None,
486 task_ids: str | Iterable[str] | None = None,
487 dag_ids: str | Iterable[str] | None = None,
488 map_indexes: int | Iterable[int] | None = None,
489 include_prior_dates: bool = False,
490 limit: int | None = None,
491 session: Session = NEW_SESSION,
492 *,
493 run_id: str | None = None,
494 ) -> Query:
495 """Composes a query to get one or more XCom entries.
497 :sphinx-autoapi-skip:
498 """
499 from airflow.models.dagrun import DagRun
501 if not exactly_one(execution_date is not None, run_id is not None):
502 raise ValueError(
503 f"Exactly one of run_id or execution_date must be passed. "
504 f"Passed execution_date={execution_date}, run_id={run_id}"
505 )
506 if execution_date is not None:
507 message = "Passing 'execution_date' to 'XCom.get_many()' is deprecated. Use 'run_id' instead."
508 warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3)
510 query = session.query(BaseXCom).join(BaseXCom.dag_run)
512 if key:
513 query = query.filter(BaseXCom.key == key)
515 if is_container(task_ids):
516 query = query.filter(BaseXCom.task_id.in_(task_ids))
517 elif task_ids is not None:
518 query = query.filter(BaseXCom.task_id == task_ids)
520 if is_container(dag_ids):
521 query = query.filter(BaseXCom.dag_id.in_(dag_ids))
522 elif dag_ids is not None:
523 query = query.filter(BaseXCom.dag_id == dag_ids)
525 if isinstance(map_indexes, range) and map_indexes.step == 1:
526 query = query.filter(
527 BaseXCom.map_index >= map_indexes.start, BaseXCom.map_index < map_indexes.stop
528 )
529 elif is_container(map_indexes):
530 query = query.filter(BaseXCom.map_index.in_(map_indexes))
531 elif map_indexes is not None:
532 query = query.filter(BaseXCom.map_index == map_indexes)
534 if include_prior_dates:
535 if execution_date is not None:
536 query = query.filter(DagRun.execution_date <= execution_date)
537 else:
538 dr = session.query(DagRun.execution_date).filter(DagRun.run_id == run_id).subquery()
539 query = query.filter(BaseXCom.execution_date <= dr.c.execution_date)
540 elif execution_date is not None:
541 query = query.filter(DagRun.execution_date == execution_date)
542 else:
543 query = query.filter(BaseXCom.run_id == run_id)
545 query = query.order_by(DagRun.execution_date.desc(), BaseXCom.timestamp.desc())
546 if limit:
547 return query.limit(limit)
548 return query
550 @classmethod
551 @provide_session
552 def delete(cls, xcoms: XCom | Iterable[XCom], session: Session) -> None:
553 """Delete one or multiple XCom entries."""
554 if isinstance(xcoms, XCom):
555 xcoms = [xcoms]
556 for xcom in xcoms:
557 if not isinstance(xcom, XCom):
558 raise TypeError(f"Expected XCom; received {xcom.__class__.__name__}")
559 XCom.purge(xcom, session)
560 session.delete(xcom)
561 session.commit()
563 @staticmethod
564 def purge(xcom: XCom, session: Session) -> None:
565 """Purge an XCom entry from underlying storage implementations."""
566 pass
568 @overload
569 @staticmethod
570 @internal_api_call
571 def clear(
572 *,
573 dag_id: str,
574 task_id: str,
575 run_id: str,
576 map_index: int | None = None,
577 session: Session = NEW_SESSION,
578 ) -> None:
579 """Clear all XCom data from the database for the given task instance.
581 A deprecated form of this function accepts ``execution_date`` instead of
582 ``run_id``. The two arguments are mutually exclusive.
584 :param dag_id: ID of DAG to clear the XCom for.
585 :param task_id: ID of task to clear the XCom for.
586 :param run_id: ID of DAG run to clear the XCom for.
587 :param map_index: If given, only clear XCom from this particular mapped
588 task. The default ``None`` clears *all* XComs from the task.
589 :param session: Database session. If not given, a new session will be
590 created for this function.
591 """
593 @overload
594 @staticmethod
595 @internal_api_call
596 def clear(
597 execution_date: pendulum.DateTime,
598 dag_id: str,
599 task_id: str,
600 session: Session = NEW_SESSION,
601 ) -> None:
602 """Clear all XCom data from the database for the given task instance.
604 :sphinx-autoapi-skip:
605 """
607 @staticmethod
608 @provide_session
609 @internal_api_call
610 def clear(
611 execution_date: pendulum.DateTime | None = None,
612 dag_id: str | None = None,
613 task_id: str | None = None,
614 session: Session = NEW_SESSION,
615 *,
616 run_id: str | None = None,
617 map_index: int | None = None,
618 ) -> None:
619 """Clear all XCom data from the database for the given task instance.
621 :sphinx-autoapi-skip:
622 """
623 from airflow.models import DagRun
625 # Given the historic order of this function (execution_date was first argument) to add a new optional
626 # param we need to add default values for everything :(
627 if dag_id is None:
628 raise TypeError("clear() missing required argument: dag_id")
629 if task_id is None:
630 raise TypeError("clear() missing required argument: task_id")
632 if not exactly_one(execution_date is not None, run_id is not None):
633 raise ValueError(
634 f"Exactly one of run_id or execution_date must be passed. "
635 f"Passed execution_date={execution_date}, run_id={run_id}"
636 )
638 if execution_date is not None:
639 message = "Passing 'execution_date' to 'XCom.clear()' is deprecated. Use 'run_id' instead."
640 warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3)
641 run_id = (
642 session.query(DagRun.run_id)
643 .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
644 .scalar()
645 )
647 query = session.query(BaseXCom).filter_by(dag_id=dag_id, task_id=task_id, run_id=run_id)
648 if map_index is not None:
649 query = query.filter_by(map_index=map_index)
651 for xcom in query:
652 # print(f"Clearing XCOM {xcom} with value {xcom.value}")
653 XCom.purge(xcom, session)
654 session.delete(xcom)
656 session.commit()
658 @staticmethod
659 def serialize_value(
660 value: Any,
661 *,
662 key: str | None = None,
663 task_id: str | None = None,
664 dag_id: str | None = None,
665 run_id: str | None = None,
666 map_index: int | None = None,
667 ) -> Any:
668 """Serialize XCom value to str or pickled object."""
669 if conf.getboolean("core", "enable_xcom_pickling"):
670 return pickle.dumps(value)
671 try:
672 return json.dumps(value, cls=XComEncoder).encode("UTF-8")
673 except (ValueError, TypeError) as ex:
674 log.error(
675 "%s."
676 " If you are using pickle instead of JSON for XCom,"
677 " then you need to enable pickle support for XCom"
678 " in your airflow config or make sure to decorate your"
679 " object with attr.",
680 ex,
681 )
682 raise
684 @staticmethod
685 def _deserialize_value(result: XCom, orm: bool) -> Any:
686 object_hook = None
687 if orm:
688 object_hook = XComDecoder.orm_object_hook
690 if result.value is None:
691 return None
692 if conf.getboolean("core", "enable_xcom_pickling"):
693 try:
694 return pickle.loads(result.value)
695 except pickle.UnpicklingError:
696 return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)
697 else:
698 # Since xcom_pickling is disabled, we should only try to deserialize with JSON
699 return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)
701 @staticmethod
702 def deserialize_value(result: XCom) -> Any:
703 """Deserialize XCom value from str or pickle object."""
704 return BaseXCom._deserialize_value(result, False)
706 def orm_deserialize_value(self) -> Any:
707 """
708 Deserialize method which is used to reconstruct ORM XCom object.
710 This method should be overridden in custom XCom backends to avoid
711 unnecessary request or other resource consuming operations when
712 creating XCom orm model. This is used when viewing XCom listing
713 in the webserver, for example.
714 """
715 return BaseXCom._deserialize_value(self, True)
718class LazyXComSelectSequence(LazySelectSequence[Any]):
719 """List-like interface to lazily access XCom values.
721 :meta private:
722 """
724 @staticmethod
725 def _rebuild_select(stmt: TextClause) -> Select:
726 return select(XCom.value).from_statement(stmt)
728 @staticmethod
729 def _process_row(row: Row) -> Any:
730 return XCom.deserialize_value(row)
733def _patch_outdated_serializer(clazz: type[BaseXCom], params: Iterable[str]) -> None:
734 """Patch a custom ``serialize_value`` to accept the modern signature.
736 To give custom XCom backends more flexibility with how they store values, we
737 now forward all params passed to ``XCom.set`` to ``XCom.serialize_value``.
738 In order to maintain compatibility with custom XCom backends written with
739 the old signature, we check the signature and, if necessary, patch with a
740 method that ignores kwargs the backend does not accept.
741 """
742 old_serializer = clazz.serialize_value
744 @wraps(old_serializer)
745 def _shim(**kwargs):
746 kwargs = {k: kwargs.get(k) for k in params}
747 warnings.warn(
748 f"Method `serialize_value` in XCom backend {XCom.__name__} is using outdated signature and"
749 f"must be updated to accept all params in `BaseXCom.set` except `session`. Support will be "
750 f"removed in a future release.",
751 RemovedInAirflow3Warning,
752 stacklevel=1,
753 )
754 return old_serializer(**kwargs)
756 clazz.serialize_value = _shim # type: ignore[assignment]
759def _get_function_params(function) -> list[str]:
760 """
761 Return the list of variables names of a function.
763 :param function: The function to inspect
764 """
765 parameters = inspect.signature(function).parameters
766 bound_arguments = [
767 name for name, p in parameters.items() if p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
768 ]
769 return bound_arguments
772def resolve_xcom_backend() -> type[BaseXCom]:
773 """Resolve custom XCom class.
775 Confirm that custom XCom class extends the BaseXCom.
776 Compare the function signature of the custom XCom serialize_value to the base XCom serialize_value.
777 """
778 clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
779 if not clazz:
780 return BaseXCom
781 if not issubclass(clazz, BaseXCom):
782 raise TypeError(
783 f"Your custom XCom class `{clazz.__name__}` is not a subclass of `{BaseXCom.__name__}`."
784 )
785 base_xcom_params = _get_function_params(BaseXCom.serialize_value)
786 xcom_params = _get_function_params(clazz.serialize_value)
787 if set(base_xcom_params) != set(xcom_params):
788 _patch_outdated_serializer(clazz=clazz, params=xcom_params)
789 return clazz
792if TYPE_CHECKING:
793 XCom = BaseXCom # Hack to avoid Mypy "Variable 'XCom' is not valid as a type".
794else:
795 XCom = resolve_xcom_backend()