1from django.db import NotSupportedError
2from django.db.models.expressions import Func, Value
3from django.db.models.fields import TextField
4from django.db.models.fields.json import JSONField
5from django.db.models.functions import Cast
6
7
8class JSONArray(Func):
9 function = "JSON_ARRAY"
10 output_field = JSONField()
11
12 def as_sql(self, compiler, connection, **extra_context):
13 if not connection.features.supports_json_field:
14 raise NotSupportedError(
15 "JSONFields are not supported on this database backend."
16 )
17 return super().as_sql(compiler, connection, **extra_context)
18
19 def as_native(self, compiler, connection, *, returning, **extra_context):
20 # PostgreSQL 16+ and Oracle remove SQL NULL values from the array by
21 # default. Adds the NULL ON NULL clause to keep NULL values in the
22 # array, mapping them to JSON null values, which matches the behavior
23 # of SQLite.
24 null_on_null = "NULL ON NULL" if len(self.get_source_expressions()) > 0 else ""
25
26 return self.as_sql(
27 compiler,
28 connection,
29 template=(
30 f"%(function)s(%(expressions)s {null_on_null} RETURNING {returning})"
31 ),
32 **extra_context,
33 )
34
35 def as_postgresql(self, compiler, connection, **extra_context):
36 # Casting source expressions is only required using JSONB_BUILD_ARRAY
37 # or when using JSON_ARRAY on PostgreSQL 16+ with server-side bindings.
38 # This is done in all cases for consistency.
39 casted_obj = self.copy()
40 casted_obj.set_source_expressions(
41 [
42 (
43 # Conditional Cast to avoid unnecessary wrapping.
44 expression
45 if isinstance(expression, Cast)
46 else Cast(expression, expression.output_field)
47 )
48 for expression in casted_obj.get_source_expressions()
49 ]
50 )
51
52 if connection.features.is_postgresql_16:
53 return casted_obj.as_native(
54 compiler, connection, returning="JSONB", **extra_context
55 )
56
57 return casted_obj.as_sql(
58 compiler,
59 connection,
60 function="JSONB_BUILD_ARRAY",
61 **extra_context,
62 )
63
64 def as_oracle(self, compiler, connection, **extra_context):
65 return self.as_native(compiler, connection, returning="CLOB", **extra_context)
66
67
68class JSONObject(Func):
69 function = "JSON_OBJECT"
70 output_field = JSONField()
71
72 def __init__(self, **fields):
73 expressions = []
74 for key, value in fields.items():
75 expressions.extend((Value(key), value))
76 super().__init__(*expressions)
77
78 def as_sql(self, compiler, connection, **extra_context):
79 if not connection.features.has_json_object_function:
80 raise NotSupportedError(
81 "JSONObject() is not supported on this database backend."
82 )
83 return super().as_sql(compiler, connection, **extra_context)
84
85 def join(self, args):
86 pairs = zip(args[::2], args[1::2], strict=True)
87 # Wrap 'key' in parentheses in case of postgres cast :: syntax.
88 return ", ".join([f"({key}) VALUE {value}" for key, value in pairs])
89
90 def as_native(self, compiler, connection, *, returning, **extra_context):
91 return self.as_sql(
92 compiler,
93 connection,
94 arg_joiner=self,
95 template=f"%(function)s(%(expressions)s RETURNING {returning})",
96 **extra_context,
97 )
98
99 def as_postgresql(self, compiler, connection, **extra_context):
100 # Casting keys to text is only required when using JSONB_BUILD_OBJECT
101 # or when using JSON_OBJECT on PostgreSQL 16+ with server-side bindings.
102 # This is done in all cases for consistency.
103 copy = self.copy()
104 copy.set_source_expressions(
105 [
106 Cast(expression, TextField()) if index % 2 == 0 else expression
107 for index, expression in enumerate(copy.get_source_expressions())
108 ]
109 )
110
111 if connection.features.is_postgresql_16:
112 return copy.as_native(
113 compiler, connection, returning="JSONB", **extra_context
114 )
115
116 return super(JSONObject, copy).as_sql(
117 compiler,
118 connection,
119 function="JSONB_BUILD_OBJECT",
120 **extra_context,
121 )
122
123 def as_oracle(self, compiler, connection, **extra_context):
124 return self.as_native(compiler, connection, returning="CLOB", **extra_context)