Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/models/log.py: 48%
40 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from sqlalchemy import Column, Index, Integer, String, Text
22from airflow.models.base import Base, StringID
23from airflow.utils import timezone
24from airflow.utils.sqlalchemy import UtcDateTime
27class Log(Base):
28 """Used to actively log events to the database"""
30 __tablename__ = "log"
32 id = Column(Integer, primary_key=True)
33 dttm = Column(UtcDateTime)
34 dag_id = Column(StringID())
35 task_id = Column(StringID())
36 map_index = Column(Integer)
37 event = Column(String(30))
38 execution_date = Column(UtcDateTime)
39 owner = Column(String(500))
40 extra = Column(Text)
42 __table_args__ = (
43 Index("idx_log_dag", dag_id),
44 Index("idx_log_event", event),
45 )
47 def __init__(self, event, task_instance=None, owner=None, extra=None, **kwargs):
48 self.dttm = timezone.utcnow()
49 self.event = event
50 self.extra = extra
52 task_owner = None
54 if task_instance:
55 self.dag_id = task_instance.dag_id
56 self.task_id = task_instance.task_id
57 self.execution_date = task_instance.execution_date
58 self.map_index = task_instance.map_index
59 if getattr(task_instance, "task", None):
60 task_owner = task_instance.task.owner
62 if "task_id" in kwargs:
63 self.task_id = kwargs["task_id"]
64 if "dag_id" in kwargs:
65 self.dag_id = kwargs["dag_id"]
66 if kwargs.get("execution_date"):
67 self.execution_date = kwargs["execution_date"]
68 if "map_index" in kwargs:
69 self.map_index = kwargs["map_index"]
71 self.owner = owner or task_owner
73 def __str__(self) -> str:
74 return f"Log({self.event}, {self.task_id}, {self.owner}, {self.extra})"