Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/models/log.py: 48%
40 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from sqlalchemy import Column, Index, Integer, String, Text
22from airflow.models.base import Base, StringID
23from airflow.utils import timezone
24from airflow.utils.sqlalchemy import UtcDateTime
27class Log(Base):
28 """Used to actively log events to the database."""
30 __tablename__ = "log"
32 id = Column(Integer, primary_key=True)
33 dttm = Column(UtcDateTime)
34 dag_id = Column(StringID())
35 task_id = Column(StringID())
36 map_index = Column(Integer)
37 event = Column(String(30))
38 execution_date = Column(UtcDateTime)
39 owner = Column(String(500))
40 extra = Column(Text)
42 __table_args__ = (
43 Index("idx_log_dag", dag_id),
44 Index("idx_log_dttm", dttm),
45 Index("idx_log_event", event),
46 )
48 def __init__(self, event, task_instance=None, owner=None, extra=None, **kwargs):
49 self.dttm = timezone.utcnow()
50 self.event = event
51 self.extra = extra
53 task_owner = None
55 if task_instance:
56 self.dag_id = task_instance.dag_id
57 self.task_id = task_instance.task_id
58 self.execution_date = task_instance.execution_date
59 self.map_index = task_instance.map_index
60 if getattr(task_instance, "task", None):
61 task_owner = task_instance.task.owner
63 if "task_id" in kwargs:
64 self.task_id = kwargs["task_id"]
65 if "dag_id" in kwargs:
66 self.dag_id = kwargs["dag_id"]
67 if kwargs.get("execution_date"):
68 self.execution_date = kwargs["execution_date"]
69 if "map_index" in kwargs:
70 self.map_index = kwargs["map_index"]
72 self.owner = owner or task_owner
74 def __str__(self) -> str:
75 return f"Log({self.event}, {self.task_id}, {self.owner}, {self.extra})"