Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/task_instance_session.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

33 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17 

18from __future__ import annotations 

19 

20import contextlib 

21import logging 

22import traceback 

23from typing import TYPE_CHECKING 

24 

25from airflow import settings 

26from airflow.api_internal.internal_api_call import InternalApiConfig 

27from airflow.settings import TracebackSession 

28 

29if TYPE_CHECKING: 

30 from sqlalchemy.orm import Session 

31 

32__current_task_instance_session: Session | None = None 

33 

34log = logging.getLogger(__name__) 

35 

36 

37def get_current_task_instance_session() -> Session: 

38 global __current_task_instance_session 

39 if not __current_task_instance_session: 

40 if InternalApiConfig.get_use_internal_api(): 

41 __current_task_instance_session = TracebackSession() 

42 return __current_task_instance_session 

43 log.warning("No task session set for this task. Continuing but this likely causes a resource leak.") 

44 log.warning("Please report this and stacktrace below to https://github.com/apache/airflow/issues") 

45 for filename, line_number, name, line in traceback.extract_stack(): 

46 log.warning('File: "%s", %s , in %s', filename, line_number, name) 

47 if line: 

48 log.warning(" %s", line.strip()) 

49 __current_task_instance_session = settings.Session() 

50 return __current_task_instance_session 

51 

52 

53@contextlib.contextmanager 

54def set_current_task_instance_session(session: Session): 

55 global __current_task_instance_session 

56 if __current_task_instance_session: 

57 raise RuntimeError( 

58 "Session already set for this task. " 

59 "You can only have one 'set_current_task_session' context manager active at a time." 

60 ) 

61 __current_task_instance_session = session 

62 try: 

63 yield 

64 finally: 

65 __current_task_instance_session = None