Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/session.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

45 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17from __future__ import annotations 

18 

19import contextlib 

20from functools import wraps 

21from inspect import signature 

22from typing import Callable, Generator, TypeVar, cast 

23 

24from sqlalchemy.orm import Session as SASession 

25 

26from airflow import settings 

27from airflow.api_internal.internal_api_call import InternalApiConfig 

28from airflow.settings import TracebackSession 

29from airflow.typing_compat import ParamSpec 

30 

31 

32@contextlib.contextmanager 

33def create_session() -> Generator[SASession, None, None]: 

34 """Contextmanager that will create and teardown a session.""" 

35 if InternalApiConfig.get_use_internal_api(): 

36 yield TracebackSession() 

37 return 

38 Session = getattr(settings, "Session", None) 

39 if Session is None: 

40 raise RuntimeError("Session must be set before!") 

41 session = Session() 

42 try: 

43 yield session 

44 session.commit() 

45 except Exception: 

46 session.rollback() 

47 raise 

48 finally: 

49 session.close() 

50 

51 

52PS = ParamSpec("PS") 

53RT = TypeVar("RT") 

54 

55 

56def find_session_idx(func: Callable[PS, RT]) -> int: 

57 """Find session index in function call parameter.""" 

58 func_params = signature(func).parameters 

59 try: 

60 # func_params is an ordered dict -- this is the "recommended" way of getting the position 

61 session_args_idx = tuple(func_params).index("session") 

62 except ValueError: 

63 raise ValueError(f"Function {func.__qualname__} has no `session` argument") from None 

64 

65 return session_args_idx 

66 

67 

68def provide_session(func: Callable[PS, RT]) -> Callable[PS, RT]: 

69 """ 

70 Provide a session if it isn't provided. 

71 

72 If you want to reuse a session or run the function as part of a 

73 database transaction, you pass it to the function, if not this wrapper 

74 will create one and close it for you. 

75 """ 

76 session_args_idx = find_session_idx(func) 

77 

78 @wraps(func) 

79 def wrapper(*args, **kwargs) -> RT: 

80 if "session" in kwargs or session_args_idx < len(args): 

81 return func(*args, **kwargs) 

82 else: 

83 with create_session() as session: 

84 return func(*args, session=session, **kwargs) 

85 

86 return wrapper 

87 

88 

89# A fake session to use in functions decorated by provide_session. This allows 

90# the 'session' argument to be of type Session instead of Session | None, 

91# making it easier to type hint the function body without dealing with the None 

92# case that can never happen at runtime. 

93NEW_SESSION: SASession = cast(SASession, None)