Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/array.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

129 statements  

1# dialects/postgresql/array.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9from __future__ import annotations 

10 

11import re 

12from typing import Any as typing_Any 

13from typing import Iterable 

14from typing import Optional 

15from typing import Sequence 

16from typing import TYPE_CHECKING 

17from typing import TypeVar 

18from typing import Union 

19 

20from .operators import CONTAINED_BY 

21from .operators import CONTAINS 

22from .operators import OVERLAP 

23from ... import types as sqltypes 

24from ... import util 

25from ...sql import expression 

26from ...sql import operators 

27from ...sql.visitors import InternalTraversal 

28 

29if TYPE_CHECKING: 

30 from ...engine.interfaces import Dialect 

31 from ...sql._typing import _ColumnExpressionArgument 

32 from ...sql._typing import _TypeEngineArgument 

33 from ...sql.elements import ColumnElement 

34 from ...sql.elements import Grouping 

35 from ...sql.expression import BindParameter 

36 from ...sql.operators import OperatorType 

37 from ...sql.selectable import _SelectIterable 

38 from ...sql.type_api import _BindProcessorType 

39 from ...sql.type_api import _LiteralProcessorType 

40 from ...sql.type_api import _ResultProcessorType 

41 from ...sql.type_api import TypeEngine 

42 from ...sql.visitors import _TraverseInternalsType 

43 from ...util.typing import Self 

44 

45 

46_T = TypeVar("_T", bound=typing_Any) 

47 

48 

49def Any( 

50 other: typing_Any, 

51 arrexpr: _ColumnExpressionArgument[_T], 

52 operator: OperatorType = operators.eq, 

53) -> ColumnElement[bool]: 

54 """A synonym for the ARRAY-level :meth:`.ARRAY.Comparator.any` method. 

55 See that method for details. 

56 

57 """ 

58 

59 return arrexpr.any(other, operator) # type: ignore[no-any-return, union-attr] # noqa: E501 

60 

61 

62def All( 

63 other: typing_Any, 

64 arrexpr: _ColumnExpressionArgument[_T], 

65 operator: OperatorType = operators.eq, 

66) -> ColumnElement[bool]: 

67 """A synonym for the ARRAY-level :meth:`.ARRAY.Comparator.all` method. 

68 See that method for details. 

69 

70 """ 

71 

72 return arrexpr.all(other, operator) # type: ignore[no-any-return, union-attr] # noqa: E501 

73 

74 

75class array(expression.ExpressionClauseList[_T]): 

76 """A PostgreSQL ARRAY literal. 

77 

78 This is used to produce ARRAY literals in SQL expressions, e.g.:: 

79 

80 from sqlalchemy.dialects.postgresql import array 

81 from sqlalchemy.dialects import postgresql 

82 from sqlalchemy import select, func 

83 

84 stmt = select(array([1, 2]) + array([3, 4, 5])) 

85 

86 print(stmt.compile(dialect=postgresql.dialect())) 

87 

88 Produces the SQL: 

89 

90 .. sourcecode:: sql 

91 

92 SELECT ARRAY[%(param_1)s, %(param_2)s] || 

93 ARRAY[%(param_3)s, %(param_4)s, %(param_5)s]) AS anon_1 

94 

95 An instance of :class:`.array` will always have the datatype 

96 :class:`_types.ARRAY`. The "inner" type of the array is inferred from the 

97 values present, unless the :paramref:`_postgresql.array.type_` keyword 

98 argument is passed:: 

99 

100 array(["foo", "bar"], type_=CHAR) 

101 

102 When constructing an empty array, the :paramref:`_postgresql.array.type_` 

103 argument is particularly important as PostgreSQL server typically requires 

104 a cast to be rendered for the inner type in order to render an empty array. 

105 SQLAlchemy's compilation for the empty array will produce this cast so 

106 that:: 

107 

108 stmt = array([], type_=Integer) 

109 print(stmt.compile(dialect=postgresql.dialect())) 

110 

111 Produces: 

112 

113 .. sourcecode:: sql 

114 

115 ARRAY[]::INTEGER[] 

116 

117 As required by PostgreSQL for empty arrays. 

118 

119 .. versionadded:: 2.0.40 added support to render empty PostgreSQL array 

120 literals with a required cast. 

121 

122 Multidimensional arrays are produced by nesting :class:`.array` constructs. 

123 The dimensionality of the final :class:`_types.ARRAY` 

124 type is calculated by 

125 recursively adding the dimensions of the inner :class:`_types.ARRAY` 

126 type:: 

127 

128 stmt = select( 

129 array( 

130 [array([1, 2]), array([3, 4]), array([column("q"), column("x")])] 

131 ) 

132 ) 

133 print(stmt.compile(dialect=postgresql.dialect())) 

134 

135 Produces: 

136 

137 .. sourcecode:: sql 

138 

139 SELECT ARRAY[ 

140 ARRAY[%(param_1)s, %(param_2)s], 

141 ARRAY[%(param_3)s, %(param_4)s], 

142 ARRAY[q, x] 

143 ] AS anon_1 

144 

145 .. versionadded:: 1.3.6 added support for multidimensional array literals 

146 

147 .. seealso:: 

148 

149 :class:`_postgresql.ARRAY` 

150 

151 """ # noqa: E501 

152 

153 __visit_name__ = "array" 

154 

155 stringify_dialect = "postgresql" 

156 

157 _traverse_internals: _TraverseInternalsType = [ 

158 ("clauses", InternalTraversal.dp_clauseelement_tuple), 

159 ("type", InternalTraversal.dp_type), 

160 ] 

161 

162 def __init__( 

163 self, 

164 clauses: Iterable[_T], 

165 *, 

166 type_: Optional[_TypeEngineArgument[_T]] = None, 

167 **kw: typing_Any, 

168 ): 

169 r"""Construct an ARRAY literal. 

170 

171 :param clauses: iterable, such as a list, containing elements to be 

172 rendered in the array 

173 :param type\_: optional type. If omitted, the type is inferred 

174 from the contents of the array. 

175 

176 """ 

177 super().__init__(operators.comma_op, *clauses, **kw) 

178 

179 main_type = ( 

180 type_ 

181 if type_ is not None 

182 else self.clauses[0].type if self.clauses else sqltypes.NULLTYPE 

183 ) 

184 

185 if isinstance(main_type, ARRAY): 

186 self.type = ARRAY( 

187 main_type.item_type, 

188 dimensions=( 

189 main_type.dimensions + 1 

190 if main_type.dimensions is not None 

191 else 2 

192 ), 

193 ) # type: ignore[assignment] 

194 else: 

195 self.type = ARRAY(main_type) # type: ignore[assignment] 

196 

197 @property 

198 def _select_iterable(self) -> _SelectIterable: 

199 return (self,) 

200 

201 def _bind_param( 

202 self, 

203 operator: OperatorType, 

204 obj: typing_Any, 

205 type_: Optional[TypeEngine[_T]] = None, 

206 _assume_scalar: bool = False, 

207 ) -> BindParameter[_T]: 

208 if _assume_scalar or operator is operators.getitem: 

209 return expression.BindParameter( 

210 None, 

211 obj, 

212 _compared_to_operator=operator, 

213 type_=type_, 

214 _compared_to_type=self.type, 

215 unique=True, 

216 ) 

217 

218 else: 

219 return array( 

220 [ 

221 self._bind_param( 

222 operator, o, _assume_scalar=True, type_=type_ 

223 ) 

224 for o in obj 

225 ] 

226 ) # type: ignore[return-value] 

227 

228 def self_group( 

229 self, against: Optional[OperatorType] = None 

230 ) -> Union[Self, Grouping[_T]]: 

231 if against in (operators.any_op, operators.all_op, operators.getitem): 

232 return expression.Grouping(self) 

233 else: 

234 return self 

235 

236 

237class ARRAY(sqltypes.ARRAY[_T]): 

238 """PostgreSQL ARRAY type. 

239 

240 The :class:`_postgresql.ARRAY` type is constructed in the same way 

241 as the core :class:`_types.ARRAY` type; a member type is required, and a 

242 number of dimensions is recommended if the type is to be used for more 

243 than one dimension:: 

244 

245 from sqlalchemy.dialects import postgresql 

246 

247 mytable = Table( 

248 "mytable", 

249 metadata, 

250 Column("data", postgresql.ARRAY(Integer, dimensions=2)), 

251 ) 

252 

253 The :class:`_postgresql.ARRAY` type provides all operations defined on the 

254 core :class:`_types.ARRAY` type, including support for "dimensions", 

255 indexed access, and simple matching such as 

256 :meth:`.types.ARRAY.Comparator.any` and 

257 :meth:`.types.ARRAY.Comparator.all`. :class:`_postgresql.ARRAY` 

258 class also 

259 provides PostgreSQL-specific methods for containment operations, including 

260 :meth:`.postgresql.ARRAY.Comparator.contains` 

261 :meth:`.postgresql.ARRAY.Comparator.contained_by`, and 

262 :meth:`.postgresql.ARRAY.Comparator.overlap`, e.g.:: 

263 

264 mytable.c.data.contains([1, 2]) 

265 

266 Indexed access is one-based by default, to match that of PostgreSQL; 

267 for zero-based indexed access, set 

268 :paramref:`_postgresql.ARRAY.zero_indexes`. 

269 

270 Additionally, the :class:`_postgresql.ARRAY` 

271 type does not work directly in 

272 conjunction with the :class:`.ENUM` type. For a workaround, see the 

273 special type at :ref:`postgresql_array_of_enum`. 

274 

275 .. container:: topic 

276 

277 **Detecting Changes in ARRAY columns when using the ORM** 

278 

279 The :class:`_postgresql.ARRAY` type, when used with the SQLAlchemy ORM, 

280 does not detect in-place mutations to the array. In order to detect 

281 these, the :mod:`sqlalchemy.ext.mutable` extension must be used, using 

282 the :class:`.MutableList` class:: 

283 

284 from sqlalchemy.dialects.postgresql import ARRAY 

285 from sqlalchemy.ext.mutable import MutableList 

286 

287 

288 class SomeOrmClass(Base): 

289 # ... 

290 

291 data = Column(MutableList.as_mutable(ARRAY(Integer))) 

292 

293 This extension will allow "in-place" changes such to the array 

294 such as ``.append()`` to produce events which will be detected by the 

295 unit of work. Note that changes to elements **inside** the array, 

296 including subarrays that are mutated in place, are **not** detected. 

297 

298 Alternatively, assigning a new array value to an ORM element that 

299 replaces the old one will always trigger a change event. 

300 

301 .. seealso:: 

302 

303 :class:`_types.ARRAY` - base array type 

304 

305 :class:`_postgresql.array` - produces a literal array value. 

306 

307 """ 

308 

309 def __init__( 

310 self, 

311 item_type: _TypeEngineArgument[_T], 

312 as_tuple: bool = False, 

313 dimensions: Optional[int] = None, 

314 zero_indexes: bool = False, 

315 ): 

316 """Construct an ARRAY. 

317 

318 E.g.:: 

319 

320 Column("myarray", ARRAY(Integer)) 

321 

322 Arguments are: 

323 

324 :param item_type: The data type of items of this array. Note that 

325 dimensionality is irrelevant here, so multi-dimensional arrays like 

326 ``INTEGER[][]``, are constructed as ``ARRAY(Integer)``, not as 

327 ``ARRAY(ARRAY(Integer))`` or such. 

328 

329 :param as_tuple=False: Specify whether return results 

330 should be converted to tuples from lists. DBAPIs such 

331 as psycopg2 return lists by default. When tuples are 

332 returned, the results are hashable. 

333 

334 :param dimensions: if non-None, the ARRAY will assume a fixed 

335 number of dimensions. This will cause the DDL emitted for this 

336 ARRAY to include the exact number of bracket clauses ``[]``, 

337 and will also optimize the performance of the type overall. 

338 Note that PG arrays are always implicitly "non-dimensioned", 

339 meaning they can store any number of dimensions no matter how 

340 they were declared. 

341 

342 :param zero_indexes=False: when True, index values will be converted 

343 between Python zero-based and PostgreSQL one-based indexes, e.g. 

344 a value of one will be added to all index values before passing 

345 to the database. 

346 

347 """ 

348 if isinstance(item_type, ARRAY): 

349 raise ValueError( 

350 "Do not nest ARRAY types; ARRAY(basetype) " 

351 "handles multi-dimensional arrays of basetype" 

352 ) 

353 if isinstance(item_type, type): 

354 item_type = item_type() 

355 self.item_type = item_type 

356 self.as_tuple = as_tuple 

357 self.dimensions = dimensions 

358 self.zero_indexes = zero_indexes 

359 

360 class Comparator(sqltypes.ARRAY.Comparator[_T]): 

361 """Define comparison operations for :class:`_types.ARRAY`. 

362 

363 Note that these operations are in addition to those provided 

364 by the base :class:`.types.ARRAY.Comparator` class, including 

365 :meth:`.types.ARRAY.Comparator.any` and 

366 :meth:`.types.ARRAY.Comparator.all`. 

367 

368 """ 

369 

370 def contains( 

371 self, other: typing_Any, **kwargs: typing_Any 

372 ) -> ColumnElement[bool]: 

373 """Boolean expression. Test if elements are a superset of the 

374 elements of the argument array expression. 

375 

376 kwargs may be ignored by this operator but are required for API 

377 conformance. 

378 """ 

379 return self.operate(CONTAINS, other, result_type=sqltypes.Boolean) 

380 

381 def contained_by(self, other: typing_Any) -> ColumnElement[bool]: 

382 """Boolean expression. Test if elements are a proper subset of the 

383 elements of the argument array expression. 

384 """ 

385 return self.operate( 

386 CONTAINED_BY, other, result_type=sqltypes.Boolean 

387 ) 

388 

389 def overlap(self, other: typing_Any) -> ColumnElement[bool]: 

390 """Boolean expression. Test if array has elements in common with 

391 an argument array expression. 

392 """ 

393 return self.operate(OVERLAP, other, result_type=sqltypes.Boolean) 

394 

395 comparator_factory = Comparator 

396 

397 @util.memoized_property 

398 def _against_native_enum(self) -> bool: 

399 return ( 

400 isinstance(self.item_type, sqltypes.Enum) 

401 and self.item_type.native_enum # type: ignore[attr-defined] 

402 ) 

403 

404 def literal_processor( 

405 self, dialect: Dialect 

406 ) -> Optional[_LiteralProcessorType[_T]]: 

407 item_proc = self.item_type.dialect_impl(dialect).literal_processor( 

408 dialect 

409 ) 

410 if item_proc is None: 

411 return None 

412 

413 def to_str(elements: Iterable[typing_Any]) -> str: 

414 return f"ARRAY[{', '.join(elements)}]" 

415 

416 def process(value: Sequence[typing_Any]) -> str: 

417 inner = self._apply_item_processor( 

418 value, item_proc, self.dimensions, to_str 

419 ) 

420 return inner 

421 

422 return process 

423 

424 def bind_processor( 

425 self, dialect: Dialect 

426 ) -> Optional[_BindProcessorType[Sequence[typing_Any]]]: 

427 item_proc = self.item_type.dialect_impl(dialect).bind_processor( 

428 dialect 

429 ) 

430 

431 def process( 

432 value: Optional[Sequence[typing_Any]], 

433 ) -> Optional[list[typing_Any]]: 

434 if value is None: 

435 return value 

436 else: 

437 return self._apply_item_processor( 

438 value, item_proc, self.dimensions, list 

439 ) 

440 

441 return process 

442 

443 def result_processor( 

444 self, dialect: Dialect, coltype: object 

445 ) -> _ResultProcessorType[Sequence[typing_Any]]: 

446 item_proc = self.item_type.dialect_impl(dialect).result_processor( 

447 dialect, coltype 

448 ) 

449 

450 def process( 

451 value: Sequence[typing_Any], 

452 ) -> Optional[Sequence[typing_Any]]: 

453 if value is None: 

454 return value 

455 else: 

456 return self._apply_item_processor( 

457 value, 

458 item_proc, 

459 self.dimensions, 

460 tuple if self.as_tuple else list, 

461 ) 

462 

463 if self._against_native_enum: 

464 super_rp = process 

465 pattern = re.compile(r"^{(.*)}$") 

466 

467 def handle_raw_string(value: str) -> list[str]: 

468 inner = pattern.match(value).group(1) # type: ignore[union-attr] # noqa: E501 

469 return _split_enum_values(inner) 

470 

471 def process( 

472 value: Sequence[typing_Any], 

473 ) -> Optional[Sequence[typing_Any]]: 

474 if value is None: 

475 return value 

476 # isinstance(value, str) is required to handle 

477 # the case where a TypeDecorator for and Array of Enum is 

478 # used like was required in sa < 1.3.17 

479 return super_rp( 

480 handle_raw_string(value) 

481 if isinstance(value, str) 

482 else value 

483 ) 

484 

485 return process 

486 

487 

488def _split_enum_values(array_string: str) -> list[str]: 

489 if '"' not in array_string: 

490 # no escape char is present so it can just split on the comma 

491 return array_string.split(",") if array_string else [] 

492 

493 # handles quoted strings from: 

494 # r'abc,"quoted","also\\\\quoted", "quoted, comma", "esc \" quot", qpr' 

495 # returns 

496 # ['abc', 'quoted', 'also\\quoted', 'quoted, comma', 'esc " quot', 'qpr'] 

497 text = array_string.replace(r"\"", "_$ESC_QUOTE$_") 

498 text = text.replace(r"\\", "\\") 

499 result = [] 

500 on_quotes = re.split(r'(")', text) 

501 in_quotes = False 

502 for tok in on_quotes: 

503 if tok == '"': 

504 in_quotes = not in_quotes 

505 elif in_quotes: 

506 result.append(tok.replace("_$ESC_QUOTE$_", '"')) 

507 else: 

508 result.extend(re.findall(r"([^\s,]+),?", tok)) 

509 return result