Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/json.py: 41%
58 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20import json
21from datetime import date, datetime
22from decimal import Decimal
23from typing import Any
25from flask.json.provider import JSONProvider
27from airflow.serialization.serde import CLASSNAME, DATA, SCHEMA_ID, deserialize, serialize
28from airflow.utils.timezone import convert_to_utc, is_naive
31class AirflowJsonProvider(JSONProvider):
32 """JSON Provider for Flask app to use WebEncoder."""
34 ensure_ascii: bool = True
35 sort_keys: bool = True
37 def dumps(self, obj, **kwargs):
38 kwargs.setdefault("ensure_ascii", self.ensure_ascii)
39 kwargs.setdefault("sort_keys", self.sort_keys)
40 return json.dumps(obj, **kwargs, cls=WebEncoder)
42 def loads(self, s: str | bytes, **kwargs):
43 return json.loads(s, **kwargs)
46class WebEncoder(json.JSONEncoder):
47 """This encodes values into a web understandable format. There is no deserializer"""
49 def default(self, o: Any) -> Any:
50 if isinstance(o, datetime):
51 if is_naive(o):
52 o = convert_to_utc(o)
53 return o.isoformat()
55 if isinstance(o, date):
56 return o.strftime("%Y-%m-%d")
58 if isinstance(o, Decimal):
59 data = serialize(o)
60 if isinstance(data, dict) and DATA in data:
61 return data[DATA]
63 try:
64 data = serialize(o)
65 if isinstance(data, dict) and CLASSNAME in data:
66 # this is here for backwards compatibility
67 if (
68 data[CLASSNAME].startswith("numpy")
69 or data[CLASSNAME] == "kubernetes.client.models.v1_pod.V1Pod"
70 ):
71 return data[DATA]
72 return data
73 except TypeError:
74 raise
77class XComEncoder(json.JSONEncoder):
78 """This encoder serializes any object that has attr, dataclass or a custom serializer."""
80 def default(self, o: object) -> Any:
81 try:
82 return serialize(o)
83 except TypeError:
84 return super().default(o)
86 def encode(self, o: Any) -> str:
87 # checked here and in serialize
88 if isinstance(o, dict) and (CLASSNAME in o or SCHEMA_ID in o):
89 raise AttributeError(f"reserved key {CLASSNAME} found in dict to serialize")
91 return super().encode(o)
94class XComDecoder(json.JSONDecoder):
95 """
96 This decoder deserializes dicts to objects if they contain
97 the `__classname__` key otherwise it will return the dict
98 as is.
99 """
101 def __init__(self, *args, **kwargs) -> None:
102 if not kwargs.get("object_hook"):
103 kwargs["object_hook"] = self.object_hook
105 super().__init__(*args, **kwargs)
107 def object_hook(self, dct: dict) -> object:
108 return deserialize(dct)
110 @staticmethod
111 def orm_object_hook(dct: dict) -> object:
112 """Creates a readable representation of a serialized object"""
113 return deserialize(dct, False)
116# backwards compatibility
117AirflowJsonEncoder = WebEncoder