Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/orm_event_handlers.py: 47%
45 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import logging
21import os
22import time
23import traceback
25import sqlalchemy.orm.mapper
26from sqlalchemy import event, exc
28from airflow.configuration import conf
30log = logging.getLogger(__name__)
33def setup_event_handlers(engine):
34 """Setups event handlers."""
35 from airflow.models import import_all_models
37 event.listen(sqlalchemy.orm.mapper, "before_configured", import_all_models, once=True)
39 @event.listens_for(engine, "connect")
40 def connect(dbapi_connection, connection_record):
41 connection_record.info["pid"] = os.getpid()
43 if engine.dialect.name == "sqlite":
45 @event.listens_for(engine, "connect")
46 def set_sqlite_pragma(dbapi_connection, connection_record):
47 cursor = dbapi_connection.cursor()
48 cursor.execute("PRAGMA foreign_keys=ON")
49 cursor.close()
51 # this ensures coherence in mysql when storing datetimes (not required for postgres)
52 if engine.dialect.name == "mysql":
54 @event.listens_for(engine, "connect")
55 def set_mysql_timezone(dbapi_connection, connection_record):
56 cursor = dbapi_connection.cursor()
57 cursor.execute("SET time_zone = '+00:00'")
58 cursor.close()
60 @event.listens_for(engine, "checkout")
61 def checkout(dbapi_connection, connection_record, connection_proxy):
62 pid = os.getpid()
63 if connection_record.info["pid"] != pid:
64 connection_record.connection = connection_proxy.connection = None
65 raise exc.DisconnectionError(
66 f"Connection record belongs to pid {connection_record.info['pid']}, "
67 f"attempting to check out in pid {pid}"
68 )
70 if conf.getboolean("debug", "sqlalchemy_stats", fallback=False):
72 @event.listens_for(engine, "before_cursor_execute")
73 def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
74 conn.info.setdefault("query_start_time", []).append(time.perf_counter())
76 @event.listens_for(engine, "after_cursor_execute")
77 def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
78 total = time.perf_counter() - conn.info["query_start_time"].pop()
79 file_name = [
80 f"'{f.name}':{f.filename}:{f.lineno}"
81 for f in traceback.extract_stack()
82 if "sqlalchemy" not in f.filename
83 ][-1]
84 stack = [f for f in traceback.extract_stack() if "sqlalchemy" not in f.filename]
85 stack_info = ">".join([f"{f.filename.rpartition('/')[-1]}:{f.name}" for f in stack][-3:])
86 conn.info.setdefault("query_start_time", []).append(time.monotonic())
87 log.info(
88 "@SQLALCHEMY %s |$ %s |$ %s |$ %s ",
89 total,
90 file_name,
91 stack_info,
92 statement.replace("\n", " "),
93 )