1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20from sqlalchemy import Column, Index, Integer, String, Text
21
22from airflow.models.base import Base, StringID
23from airflow.utils import timezone
24from airflow.utils.sqlalchemy import UtcDateTime
25
26
27class Log(Base):
28 """Used to actively log events to the database."""
29
30 __tablename__ = "log"
31
32 id = Column(Integer, primary_key=True)
33 dttm = Column(UtcDateTime)
34 dag_id = Column(StringID())
35 task_id = Column(StringID())
36 map_index = Column(Integer)
37 event = Column(String(60))
38 execution_date = Column(UtcDateTime)
39 run_id = Column(StringID())
40 owner = Column(String(500))
41 owner_display_name = Column(String(500))
42 extra = Column(Text)
43
44 __table_args__ = (
45 Index("idx_log_dag", dag_id),
46 Index("idx_log_dttm", dttm),
47 Index("idx_log_event", event),
48 )
49
50 def __init__(self, event, task_instance=None, owner=None, owner_display_name=None, extra=None, **kwargs):
51 self.dttm = timezone.utcnow()
52 self.event = event
53 self.extra = extra
54
55 task_owner = None
56
57 if task_instance:
58 self.dag_id = task_instance.dag_id
59 self.task_id = task_instance.task_id
60 self.execution_date = task_instance.execution_date
61 self.run_id = task_instance.run_id
62 self.map_index = task_instance.map_index
63 if getattr(task_instance, "task", None):
64 task_owner = task_instance.task.owner
65
66 if "task_id" in kwargs:
67 self.task_id = kwargs["task_id"]
68 if "dag_id" in kwargs:
69 self.dag_id = kwargs["dag_id"]
70 if kwargs.get("execution_date"):
71 self.execution_date = kwargs["execution_date"]
72 if kwargs.get("run_id"):
73 self.run_id = kwargs["run_id"]
74 if "map_index" in kwargs:
75 self.map_index = kwargs["map_index"]
76
77 self.owner = owner or task_owner
78 self.owner_display_name = owner_display_name or None
79
80 def __str__(self) -> str:
81 return f"Log({self.event}, {self.task_id}, {self.owner}, {self.owner_display_name}, {self.extra})"