1# dialects/postgresql/ranges.py
2# Copyright (C) 2013-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8from __future__ import annotations
9
10import dataclasses
11from datetime import date
12from datetime import datetime
13from datetime import timedelta
14from decimal import Decimal
15from typing import Any
16from typing import cast
17from typing import Generic
18from typing import List
19from typing import Optional
20from typing import overload
21from typing import Sequence
22from typing import Tuple
23from typing import Type
24from typing import TYPE_CHECKING
25from typing import TypeVar
26from typing import Union
27
28from .operators import ADJACENT_TO
29from .operators import CONTAINED_BY
30from .operators import CONTAINS
31from .operators import NOT_EXTEND_LEFT_OF
32from .operators import NOT_EXTEND_RIGHT_OF
33from .operators import OVERLAP
34from .operators import STRICTLY_LEFT_OF
35from .operators import STRICTLY_RIGHT_OF
36from ... import types as sqltypes
37from ...sql import operators
38from ...sql.type_api import TypeEngine
39from ...util import py310
40from ...util.typing import Literal
41
42if TYPE_CHECKING:
43 from ...sql.elements import ColumnElement
44 from ...sql.type_api import _TE
45 from ...sql.type_api import TypeEngineMixin
46
47_T = TypeVar("_T", bound=Any)
48
49_BoundsType = Literal["()", "[)", "(]", "[]"]
50
51if py310:
52 dc_slots = {"slots": True}
53 dc_kwonly = {"kw_only": True}
54else:
55 dc_slots = {}
56 dc_kwonly = {}
57
58
59@dataclasses.dataclass(frozen=True, **dc_slots)
60class Range(Generic[_T]):
61 """Represent a PostgreSQL range.
62
63 E.g.::
64
65 r = Range(10, 50, bounds="()")
66
67 The calling style is similar to that of psycopg and psycopg2, in part
68 to allow easier migration from previous SQLAlchemy versions that used
69 these objects directly.
70
71 :param lower: Lower bound value, or None
72 :param upper: Upper bound value, or None
73 :param bounds: keyword-only, optional string value that is one of
74 ``"()"``, ``"[)"``, ``"(]"``, ``"[]"``. Defaults to ``"[)"``.
75 :param empty: keyword-only, optional bool indicating this is an "empty"
76 range
77
78 .. versionadded:: 2.0
79
80 """
81
82 lower: Optional[_T] = None
83 """the lower bound"""
84
85 upper: Optional[_T] = None
86 """the upper bound"""
87
88 if TYPE_CHECKING:
89 bounds: _BoundsType = dataclasses.field(default="[)")
90 empty: bool = dataclasses.field(default=False)
91 else:
92 bounds: _BoundsType = dataclasses.field(default="[)", **dc_kwonly)
93 empty: bool = dataclasses.field(default=False, **dc_kwonly)
94
95 if not py310:
96
97 def __init__(
98 self,
99 lower: Optional[_T] = None,
100 upper: Optional[_T] = None,
101 *,
102 bounds: _BoundsType = "[)",
103 empty: bool = False,
104 ):
105 # no __slots__ either so we can update dict
106 self.__dict__.update(
107 {
108 "lower": lower,
109 "upper": upper,
110 "bounds": bounds,
111 "empty": empty,
112 }
113 )
114
115 def __bool__(self) -> bool:
116 return not self.empty
117
118 @property
119 def isempty(self) -> bool:
120 "A synonym for the 'empty' attribute."
121
122 return self.empty
123
124 @property
125 def is_empty(self) -> bool:
126 "A synonym for the 'empty' attribute."
127
128 return self.empty
129
130 @property
131 def lower_inc(self) -> bool:
132 """Return True if the lower bound is inclusive."""
133
134 return self.bounds[0] == "["
135
136 @property
137 def lower_inf(self) -> bool:
138 """Return True if this range is non-empty and lower bound is
139 infinite."""
140
141 return not self.empty and self.lower is None
142
143 @property
144 def upper_inc(self) -> bool:
145 """Return True if the upper bound is inclusive."""
146
147 return self.bounds[1] == "]"
148
149 @property
150 def upper_inf(self) -> bool:
151 """Return True if this range is non-empty and the upper bound is
152 infinite."""
153
154 return not self.empty and self.upper is None
155
156 @property
157 def __sa_type_engine__(self) -> AbstractSingleRange[_T]:
158 return AbstractSingleRange()
159
160 def _contains_value(self, value: _T) -> bool:
161 """Return True if this range contains the given value."""
162
163 if self.empty:
164 return False
165
166 if self.lower is None:
167 return self.upper is None or (
168 value < self.upper
169 if self.bounds[1] == ")"
170 else value <= self.upper
171 )
172
173 if self.upper is None:
174 return ( # type: ignore
175 value > self.lower
176 if self.bounds[0] == "("
177 else value >= self.lower
178 )
179
180 return ( # type: ignore
181 value > self.lower
182 if self.bounds[0] == "("
183 else value >= self.lower
184 ) and (
185 value < self.upper
186 if self.bounds[1] == ")"
187 else value <= self.upper
188 )
189
190 def _get_discrete_step(self) -> Any:
191 "Determine the “step” for this range, if it is a discrete one."
192
193 # See
194 # https://www.postgresql.org/docs/current/rangetypes.html#RANGETYPES-DISCRETE
195 # for the rationale
196
197 if isinstance(self.lower, int) or isinstance(self.upper, int):
198 return 1
199 elif isinstance(self.lower, datetime) or isinstance(
200 self.upper, datetime
201 ):
202 # This is required, because a `isinstance(datetime.now(), date)`
203 # is True
204 return None
205 elif isinstance(self.lower, date) or isinstance(self.upper, date):
206 return timedelta(days=1)
207 else:
208 return None
209
210 def _compare_edges(
211 self,
212 value1: Optional[_T],
213 bound1: str,
214 value2: Optional[_T],
215 bound2: str,
216 only_values: bool = False,
217 ) -> int:
218 """Compare two range bounds.
219
220 Return -1, 0 or 1 respectively when `value1` is less than,
221 equal to or greater than `value2`.
222
223 When `only_value` is ``True``, do not consider the *inclusivity*
224 of the edges, just their values.
225 """
226
227 value1_is_lower_bound = bound1 in {"[", "("}
228 value2_is_lower_bound = bound2 in {"[", "("}
229
230 # Infinite edges are equal when they are on the same side,
231 # otherwise a lower edge is considered less than the upper end
232 if value1 is value2 is None:
233 if value1_is_lower_bound == value2_is_lower_bound:
234 return 0
235 else:
236 return -1 if value1_is_lower_bound else 1
237 elif value1 is None:
238 return -1 if value1_is_lower_bound else 1
239 elif value2 is None:
240 return 1 if value2_is_lower_bound else -1
241
242 # Short path for trivial case
243 if bound1 == bound2 and value1 == value2:
244 return 0
245
246 value1_inc = bound1 in {"[", "]"}
247 value2_inc = bound2 in {"[", "]"}
248 step = self._get_discrete_step()
249
250 if step is not None:
251 # "Normalize" the two edges as '[)', to simplify successive
252 # logic when the range is discrete: otherwise we would need
253 # to handle the comparison between ``(0`` and ``[1`` that
254 # are equal when dealing with integers while for floats the
255 # former is lesser than the latter
256
257 if value1_is_lower_bound:
258 if not value1_inc:
259 value1 += step
260 value1_inc = True
261 else:
262 if value1_inc:
263 value1 += step
264 value1_inc = False
265 if value2_is_lower_bound:
266 if not value2_inc:
267 value2 += step
268 value2_inc = True
269 else:
270 if value2_inc:
271 value2 += step
272 value2_inc = False
273
274 if value1 < value2: # type: ignore
275 return -1
276 elif value1 > value2: # type: ignore
277 return 1
278 elif only_values:
279 return 0
280 else:
281 # Neither one is infinite but are equal, so we
282 # need to consider the respective inclusive/exclusive
283 # flag
284
285 if value1_inc and value2_inc:
286 return 0
287 elif not value1_inc and not value2_inc:
288 if value1_is_lower_bound == value2_is_lower_bound:
289 return 0
290 else:
291 return 1 if value1_is_lower_bound else -1
292 elif not value1_inc:
293 return 1 if value1_is_lower_bound else -1
294 elif not value2_inc:
295 return -1 if value2_is_lower_bound else 1
296 else:
297 return 0
298
299 def __eq__(self, other: Any) -> bool:
300 """Compare this range to the `other` taking into account
301 bounds inclusivity, returning ``True`` if they are equal.
302 """
303
304 if not isinstance(other, Range):
305 return NotImplemented
306
307 if self.empty and other.empty:
308 return True
309 elif self.empty != other.empty:
310 return False
311
312 slower = self.lower
313 slower_b = self.bounds[0]
314 olower = other.lower
315 olower_b = other.bounds[0]
316 supper = self.upper
317 supper_b = self.bounds[1]
318 oupper = other.upper
319 oupper_b = other.bounds[1]
320
321 return (
322 self._compare_edges(slower, slower_b, olower, olower_b) == 0
323 and self._compare_edges(supper, supper_b, oupper, oupper_b) == 0
324 )
325
326 def contained_by(self, other: Range[_T]) -> bool:
327 "Determine whether this range is a contained by `other`."
328
329 # Any range contains the empty one
330 if self.empty:
331 return True
332
333 # An empty range does not contain any range except the empty one
334 if other.empty:
335 return False
336
337 slower = self.lower
338 slower_b = self.bounds[0]
339 olower = other.lower
340 olower_b = other.bounds[0]
341
342 if self._compare_edges(slower, slower_b, olower, olower_b) < 0:
343 return False
344
345 supper = self.upper
346 supper_b = self.bounds[1]
347 oupper = other.upper
348 oupper_b = other.bounds[1]
349
350 if self._compare_edges(supper, supper_b, oupper, oupper_b) > 0:
351 return False
352
353 return True
354
355 def contains(self, value: Union[_T, Range[_T]]) -> bool:
356 "Determine whether this range contains `value`."
357
358 if isinstance(value, Range):
359 return value.contained_by(self)
360 else:
361 return self._contains_value(value)
362
363 __contains__ = contains
364
365 def overlaps(self, other: Range[_T]) -> bool:
366 "Determine whether this range overlaps with `other`."
367
368 # Empty ranges never overlap with any other range
369 if self.empty or other.empty:
370 return False
371
372 slower = self.lower
373 slower_b = self.bounds[0]
374 supper = self.upper
375 supper_b = self.bounds[1]
376 olower = other.lower
377 olower_b = other.bounds[0]
378 oupper = other.upper
379 oupper_b = other.bounds[1]
380
381 # Check whether this lower bound is contained in the other range
382 if (
383 self._compare_edges(slower, slower_b, olower, olower_b) >= 0
384 and self._compare_edges(slower, slower_b, oupper, oupper_b) <= 0
385 ):
386 return True
387
388 # Check whether other lower bound is contained in this range
389 if (
390 self._compare_edges(olower, olower_b, slower, slower_b) >= 0
391 and self._compare_edges(olower, olower_b, supper, supper_b) <= 0
392 ):
393 return True
394
395 return False
396
397 def strictly_left_of(self, other: Range[_T]) -> bool:
398 "Determine whether this range is completely to the left of `other`."
399
400 # Empty ranges are neither to left nor to the right of any other range
401 if self.empty or other.empty:
402 return False
403
404 supper = self.upper
405 supper_b = self.bounds[1]
406 olower = other.lower
407 olower_b = other.bounds[0]
408
409 # Check whether this upper edge is less than other's lower end
410 return self._compare_edges(supper, supper_b, olower, olower_b) < 0
411
412 __lshift__ = strictly_left_of
413
414 def strictly_right_of(self, other: Range[_T]) -> bool:
415 "Determine whether this range is completely to the right of `other`."
416
417 # Empty ranges are neither to left nor to the right of any other range
418 if self.empty or other.empty:
419 return False
420
421 slower = self.lower
422 slower_b = self.bounds[0]
423 oupper = other.upper
424 oupper_b = other.bounds[1]
425
426 # Check whether this lower edge is greater than other's upper end
427 return self._compare_edges(slower, slower_b, oupper, oupper_b) > 0
428
429 __rshift__ = strictly_right_of
430
431 def not_extend_left_of(self, other: Range[_T]) -> bool:
432 "Determine whether this does not extend to the left of `other`."
433
434 # Empty ranges are neither to left nor to the right of any other range
435 if self.empty or other.empty:
436 return False
437
438 slower = self.lower
439 slower_b = self.bounds[0]
440 olower = other.lower
441 olower_b = other.bounds[0]
442
443 # Check whether this lower edge is not less than other's lower end
444 return self._compare_edges(slower, slower_b, olower, olower_b) >= 0
445
446 def not_extend_right_of(self, other: Range[_T]) -> bool:
447 "Determine whether this does not extend to the right of `other`."
448
449 # Empty ranges are neither to left nor to the right of any other range
450 if self.empty or other.empty:
451 return False
452
453 supper = self.upper
454 supper_b = self.bounds[1]
455 oupper = other.upper
456 oupper_b = other.bounds[1]
457
458 # Check whether this upper edge is not greater than other's upper end
459 return self._compare_edges(supper, supper_b, oupper, oupper_b) <= 0
460
461 def _upper_edge_adjacent_to_lower(
462 self,
463 value1: Optional[_T],
464 bound1: str,
465 value2: Optional[_T],
466 bound2: str,
467 ) -> bool:
468 """Determine whether an upper bound is immediately successive to a
469 lower bound."""
470
471 # Since we need a peculiar way to handle the bounds inclusivity,
472 # just do a comparison by value here
473 res = self._compare_edges(value1, bound1, value2, bound2, True)
474 if res == -1:
475 step = self._get_discrete_step()
476 if step is None:
477 return False
478 if bound1 == "]":
479 if bound2 == "[":
480 return value1 == value2 - step # type: ignore
481 else:
482 return value1 == value2
483 else:
484 if bound2 == "[":
485 return value1 == value2
486 else:
487 return value1 == value2 - step # type: ignore
488 elif res == 0:
489 # Cover cases like [0,0] -|- [1,] and [0,2) -|- (1,3]
490 if (
491 bound1 == "]"
492 and bound2 == "["
493 or bound1 == ")"
494 and bound2 == "("
495 ):
496 step = self._get_discrete_step()
497 if step is not None:
498 return True
499 return (
500 bound1 == ")"
501 and bound2 == "["
502 or bound1 == "]"
503 and bound2 == "("
504 )
505 else:
506 return False
507
508 def adjacent_to(self, other: Range[_T]) -> bool:
509 "Determine whether this range is adjacent to the `other`."
510
511 # Empty ranges are not adjacent to any other range
512 if self.empty or other.empty:
513 return False
514
515 slower = self.lower
516 slower_b = self.bounds[0]
517 supper = self.upper
518 supper_b = self.bounds[1]
519 olower = other.lower
520 olower_b = other.bounds[0]
521 oupper = other.upper
522 oupper_b = other.bounds[1]
523
524 return self._upper_edge_adjacent_to_lower(
525 supper, supper_b, olower, olower_b
526 ) or self._upper_edge_adjacent_to_lower(
527 oupper, oupper_b, slower, slower_b
528 )
529
530 def union(self, other: Range[_T]) -> Range[_T]:
531 """Compute the union of this range with the `other`.
532
533 This raises a ``ValueError`` exception if the two ranges are
534 "disjunct", that is neither adjacent nor overlapping.
535 """
536
537 # Empty ranges are "additive identities"
538 if self.empty:
539 return other
540 if other.empty:
541 return self
542
543 if not self.overlaps(other) and not self.adjacent_to(other):
544 raise ValueError(
545 "Adding non-overlapping and non-adjacent"
546 " ranges is not implemented"
547 )
548
549 slower = self.lower
550 slower_b = self.bounds[0]
551 supper = self.upper
552 supper_b = self.bounds[1]
553 olower = other.lower
554 olower_b = other.bounds[0]
555 oupper = other.upper
556 oupper_b = other.bounds[1]
557
558 if self._compare_edges(slower, slower_b, olower, olower_b) < 0:
559 rlower = slower
560 rlower_b = slower_b
561 else:
562 rlower = olower
563 rlower_b = olower_b
564
565 if self._compare_edges(supper, supper_b, oupper, oupper_b) > 0:
566 rupper = supper
567 rupper_b = supper_b
568 else:
569 rupper = oupper
570 rupper_b = oupper_b
571
572 return Range(
573 rlower, rupper, bounds=cast(_BoundsType, rlower_b + rupper_b)
574 )
575
576 def __add__(self, other: Range[_T]) -> Range[_T]:
577 return self.union(other)
578
579 def difference(self, other: Range[_T]) -> Range[_T]:
580 """Compute the difference between this range and the `other`.
581
582 This raises a ``ValueError`` exception if the two ranges are
583 "disjunct", that is neither adjacent nor overlapping.
584 """
585
586 # Subtracting an empty range is a no-op
587 if self.empty or other.empty:
588 return self
589
590 slower = self.lower
591 slower_b = self.bounds[0]
592 supper = self.upper
593 supper_b = self.bounds[1]
594 olower = other.lower
595 olower_b = other.bounds[0]
596 oupper = other.upper
597 oupper_b = other.bounds[1]
598
599 sl_vs_ol = self._compare_edges(slower, slower_b, olower, olower_b)
600 su_vs_ou = self._compare_edges(supper, supper_b, oupper, oupper_b)
601 if sl_vs_ol < 0 and su_vs_ou > 0:
602 raise ValueError(
603 "Subtracting a strictly inner range is not implemented"
604 )
605
606 sl_vs_ou = self._compare_edges(slower, slower_b, oupper, oupper_b)
607 su_vs_ol = self._compare_edges(supper, supper_b, olower, olower_b)
608
609 # If the ranges do not overlap, result is simply the first
610 if sl_vs_ou > 0 or su_vs_ol < 0:
611 return self
612
613 # If this range is completely contained by the other, result is empty
614 if sl_vs_ol >= 0 and su_vs_ou <= 0:
615 return Range(None, None, empty=True)
616
617 # If this range extends to the left of the other and ends in its
618 # middle
619 if sl_vs_ol <= 0 and su_vs_ol >= 0 and su_vs_ou <= 0:
620 rupper_b = ")" if olower_b == "[" else "]"
621 if (
622 slower_b != "["
623 and rupper_b != "]"
624 and self._compare_edges(slower, slower_b, olower, rupper_b)
625 == 0
626 ):
627 return Range(None, None, empty=True)
628 else:
629 return Range(
630 slower,
631 olower,
632 bounds=cast(_BoundsType, slower_b + rupper_b),
633 )
634
635 # If this range starts in the middle of the other and extends to its
636 # right
637 if sl_vs_ol >= 0 and su_vs_ou >= 0 and sl_vs_ou <= 0:
638 rlower_b = "(" if oupper_b == "]" else "["
639 if (
640 rlower_b != "["
641 and supper_b != "]"
642 and self._compare_edges(oupper, rlower_b, supper, supper_b)
643 == 0
644 ):
645 return Range(None, None, empty=True)
646 else:
647 return Range(
648 oupper,
649 supper,
650 bounds=cast(_BoundsType, rlower_b + supper_b),
651 )
652
653 assert False, f"Unhandled case computing {self} - {other}"
654
655 def __sub__(self, other: Range[_T]) -> Range[_T]:
656 return self.difference(other)
657
658 def intersection(self, other: Range[_T]) -> Range[_T]:
659 """Compute the intersection of this range with the `other`.
660
661 .. versionadded:: 2.0.10
662
663 """
664 if self.empty or other.empty or not self.overlaps(other):
665 return Range(None, None, empty=True)
666
667 slower = self.lower
668 slower_b = self.bounds[0]
669 supper = self.upper
670 supper_b = self.bounds[1]
671 olower = other.lower
672 olower_b = other.bounds[0]
673 oupper = other.upper
674 oupper_b = other.bounds[1]
675
676 if self._compare_edges(slower, slower_b, olower, olower_b) < 0:
677 rlower = olower
678 rlower_b = olower_b
679 else:
680 rlower = slower
681 rlower_b = slower_b
682
683 if self._compare_edges(supper, supper_b, oupper, oupper_b) > 0:
684 rupper = oupper
685 rupper_b = oupper_b
686 else:
687 rupper = supper
688 rupper_b = supper_b
689
690 return Range(
691 rlower,
692 rupper,
693 bounds=cast(_BoundsType, rlower_b + rupper_b),
694 )
695
696 def __mul__(self, other: Range[_T]) -> Range[_T]:
697 return self.intersection(other)
698
699 def __str__(self) -> str:
700 return self._stringify()
701
702 def _stringify(self) -> str:
703 if self.empty:
704 return "empty"
705
706 l, r = self.lower, self.upper
707 l = "" if l is None else l # type: ignore
708 r = "" if r is None else r # type: ignore
709
710 b0, b1 = cast("Tuple[str, str]", self.bounds)
711
712 return f"{b0}{l},{r}{b1}"
713
714
715class MultiRange(List[Range[_T]]):
716 """Represents a multirange sequence.
717
718 This list subclass is an utility to allow automatic type inference of
719 the proper multi-range SQL type depending on the single range values.
720 This is useful when operating on literal multi-ranges::
721
722 import sqlalchemy as sa
723 from sqlalchemy.dialects.postgresql import MultiRange, Range
724
725 value = literal(MultiRange([Range(2, 4)]))
726
727 select(tbl).where(tbl.c.value.op("@")(MultiRange([Range(-3, 7)])))
728
729 .. versionadded:: 2.0.26
730
731 .. seealso::
732
733 - :ref:`postgresql_multirange_list_use`.
734 """
735
736 @property
737 def __sa_type_engine__(self) -> AbstractMultiRange[_T]:
738 return AbstractMultiRange()
739
740
741class AbstractRange(sqltypes.TypeEngine[_T]):
742 """Base class for single and multi Range SQL types."""
743
744 render_bind_cast = True
745
746 __abstract__ = True
747
748 @overload
749 def adapt(self, cls: Type[_TE], **kw: Any) -> _TE: ...
750
751 @overload
752 def adapt(
753 self, cls: Type[TypeEngineMixin], **kw: Any
754 ) -> TypeEngine[Any]: ...
755
756 def adapt(
757 self,
758 cls: Type[Union[TypeEngine[Any], TypeEngineMixin]],
759 **kw: Any,
760 ) -> TypeEngine[Any]:
761 """Dynamically adapt a range type to an abstract impl.
762
763 For example ``INT4RANGE().adapt(_Psycopg2NumericRange)`` should
764 produce a type that will have ``_Psycopg2NumericRange`` behaviors
765 and also render as ``INT4RANGE`` in SQL and DDL.
766
767 """
768 if (
769 issubclass(cls, (AbstractSingleRangeImpl, AbstractMultiRangeImpl))
770 and cls is not self.__class__
771 ):
772 # two ways to do this are: 1. create a new type on the fly
773 # or 2. have AbstractRangeImpl(visit_name) constructor and a
774 # visit_abstract_range_impl() method in the PG compiler.
775 # I'm choosing #1 as the resulting type object
776 # will then make use of the same mechanics
777 # as if we had made all these sub-types explicitly, and will
778 # also look more obvious under pdb etc.
779 # The adapt() operation here is cached per type-class-per-dialect,
780 # so is not much of a performance concern
781 visit_name = self.__visit_name__
782 return type( # type: ignore
783 f"{visit_name}RangeImpl",
784 (cls, self.__class__),
785 {"__visit_name__": visit_name},
786 )()
787 else:
788 return super().adapt(cls)
789
790 class comparator_factory(TypeEngine.Comparator[Range[Any]]):
791 """Define comparison operations for range types."""
792
793 def contains(self, other: Any, **kw: Any) -> ColumnElement[bool]:
794 """Boolean expression. Returns true if the right hand operand,
795 which can be an element or a range, is contained within the
796 column.
797
798 kwargs may be ignored by this operator but are required for API
799 conformance.
800 """
801 return self.expr.operate(CONTAINS, other)
802
803 def contained_by(self, other: Any) -> ColumnElement[bool]:
804 """Boolean expression. Returns true if the column is contained
805 within the right hand operand.
806 """
807 return self.expr.operate(CONTAINED_BY, other)
808
809 def overlaps(self, other: Any) -> ColumnElement[bool]:
810 """Boolean expression. Returns true if the column overlaps
811 (has points in common with) the right hand operand.
812 """
813 return self.expr.operate(OVERLAP, other)
814
815 def strictly_left_of(self, other: Any) -> ColumnElement[bool]:
816 """Boolean expression. Returns true if the column is strictly
817 left of the right hand operand.
818 """
819 return self.expr.operate(STRICTLY_LEFT_OF, other)
820
821 __lshift__ = strictly_left_of
822
823 def strictly_right_of(self, other: Any) -> ColumnElement[bool]:
824 """Boolean expression. Returns true if the column is strictly
825 right of the right hand operand.
826 """
827 return self.expr.operate(STRICTLY_RIGHT_OF, other)
828
829 __rshift__ = strictly_right_of
830
831 def not_extend_right_of(self, other: Any) -> ColumnElement[bool]:
832 """Boolean expression. Returns true if the range in the column
833 does not extend right of the range in the operand.
834 """
835 return self.expr.operate(NOT_EXTEND_RIGHT_OF, other)
836
837 def not_extend_left_of(self, other: Any) -> ColumnElement[bool]:
838 """Boolean expression. Returns true if the range in the column
839 does not extend left of the range in the operand.
840 """
841 return self.expr.operate(NOT_EXTEND_LEFT_OF, other)
842
843 def adjacent_to(self, other: Any) -> ColumnElement[bool]:
844 """Boolean expression. Returns true if the range in the column
845 is adjacent to the range in the operand.
846 """
847 return self.expr.operate(ADJACENT_TO, other)
848
849 def union(self, other: Any) -> ColumnElement[bool]:
850 """Range expression. Returns the union of the two ranges.
851 Will raise an exception if the resulting range is not
852 contiguous.
853 """
854 return self.expr.operate(operators.add, other)
855
856 def difference(self, other: Any) -> ColumnElement[bool]:
857 """Range expression. Returns the union of the two ranges.
858 Will raise an exception if the resulting range is not
859 contiguous.
860 """
861 return self.expr.operate(operators.sub, other)
862
863 def intersection(self, other: Any) -> ColumnElement[Range[_T]]:
864 """Range expression. Returns the intersection of the two ranges.
865 Will raise an exception if the resulting range is not
866 contiguous.
867 """
868 return self.expr.operate(operators.mul, other)
869
870
871class AbstractSingleRange(AbstractRange[Range[_T]]):
872 """Base for PostgreSQL RANGE types.
873
874 These are types that return a single :class:`_postgresql.Range` object.
875
876 .. seealso::
877
878 `PostgreSQL range functions <https://www.postgresql.org/docs/current/static/functions-range.html>`_
879
880 """ # noqa: E501
881
882 __abstract__ = True
883
884 def _resolve_for_literal(self, value: Range[Any]) -> Any:
885 spec = value.lower if value.lower is not None else value.upper
886
887 if isinstance(spec, int):
888 # pg is unreasonably picky here: the query
889 # "select 1::INTEGER <@ '[1, 4)'::INT8RANGE" raises
890 # "operator does not exist: integer <@ int8range" as of pg 16
891 if _is_int32(value):
892 return INT4RANGE()
893 else:
894 return INT8RANGE()
895 elif isinstance(spec, (Decimal, float)):
896 return NUMRANGE()
897 elif isinstance(spec, datetime):
898 return TSRANGE() if not spec.tzinfo else TSTZRANGE()
899 elif isinstance(spec, date):
900 return DATERANGE()
901 else:
902 # empty Range, SQL datatype can't be determined here
903 return sqltypes.NULLTYPE
904
905
906class AbstractSingleRangeImpl(AbstractSingleRange[_T]):
907 """Marker for AbstractSingleRange that will apply a subclass-specific
908 adaptation"""
909
910
911class AbstractMultiRange(AbstractRange[Sequence[Range[_T]]]):
912 """Base for PostgreSQL MULTIRANGE types.
913
914 these are types that return a sequence of :class:`_postgresql.Range`
915 objects.
916
917 """
918
919 __abstract__ = True
920
921 def _resolve_for_literal(self, value: Sequence[Range[Any]]) -> Any:
922 if not value:
923 # empty MultiRange, SQL datatype can't be determined here
924 return sqltypes.NULLTYPE
925 first = value[0]
926 spec = first.lower if first.lower is not None else first.upper
927
928 if isinstance(spec, int):
929 # pg is unreasonably picky here: the query
930 # "select 1::INTEGER <@ '{[1, 4),[6,19)}'::INT8MULTIRANGE" raises
931 # "operator does not exist: integer <@ int8multirange" as of pg 16
932 if all(_is_int32(r) for r in value):
933 return INT4MULTIRANGE()
934 else:
935 return INT8MULTIRANGE()
936 elif isinstance(spec, (Decimal, float)):
937 return NUMMULTIRANGE()
938 elif isinstance(spec, datetime):
939 return TSMULTIRANGE() if not spec.tzinfo else TSTZMULTIRANGE()
940 elif isinstance(spec, date):
941 return DATEMULTIRANGE()
942 else:
943 # empty Range, SQL datatype can't be determined here
944 return sqltypes.NULLTYPE
945
946
947class AbstractMultiRangeImpl(AbstractMultiRange[_T]):
948 """Marker for AbstractMultiRange that will apply a subclass-specific
949 adaptation"""
950
951
952class INT4RANGE(AbstractSingleRange[int]):
953 """Represent the PostgreSQL INT4RANGE type."""
954
955 __visit_name__ = "INT4RANGE"
956
957
958class INT8RANGE(AbstractSingleRange[int]):
959 """Represent the PostgreSQL INT8RANGE type."""
960
961 __visit_name__ = "INT8RANGE"
962
963
964class NUMRANGE(AbstractSingleRange[Decimal]):
965 """Represent the PostgreSQL NUMRANGE type."""
966
967 __visit_name__ = "NUMRANGE"
968
969
970class DATERANGE(AbstractSingleRange[date]):
971 """Represent the PostgreSQL DATERANGE type."""
972
973 __visit_name__ = "DATERANGE"
974
975
976class TSRANGE(AbstractSingleRange[datetime]):
977 """Represent the PostgreSQL TSRANGE type."""
978
979 __visit_name__ = "TSRANGE"
980
981
982class TSTZRANGE(AbstractSingleRange[datetime]):
983 """Represent the PostgreSQL TSTZRANGE type."""
984
985 __visit_name__ = "TSTZRANGE"
986
987
988class INT4MULTIRANGE(AbstractMultiRange[int]):
989 """Represent the PostgreSQL INT4MULTIRANGE type."""
990
991 __visit_name__ = "INT4MULTIRANGE"
992
993
994class INT8MULTIRANGE(AbstractMultiRange[int]):
995 """Represent the PostgreSQL INT8MULTIRANGE type."""
996
997 __visit_name__ = "INT8MULTIRANGE"
998
999
1000class NUMMULTIRANGE(AbstractMultiRange[Decimal]):
1001 """Represent the PostgreSQL NUMMULTIRANGE type."""
1002
1003 __visit_name__ = "NUMMULTIRANGE"
1004
1005
1006class DATEMULTIRANGE(AbstractMultiRange[date]):
1007 """Represent the PostgreSQL DATEMULTIRANGE type."""
1008
1009 __visit_name__ = "DATEMULTIRANGE"
1010
1011
1012class TSMULTIRANGE(AbstractMultiRange[datetime]):
1013 """Represent the PostgreSQL TSRANGE type."""
1014
1015 __visit_name__ = "TSMULTIRANGE"
1016
1017
1018class TSTZMULTIRANGE(AbstractMultiRange[datetime]):
1019 """Represent the PostgreSQL TSTZRANGE type."""
1020
1021 __visit_name__ = "TSTZMULTIRANGE"
1022
1023
1024_max_int_32 = 2**31 - 1
1025_min_int_32 = -(2**31)
1026
1027
1028def _is_int32(r: Range[int]) -> bool:
1029 return (r.lower is None or _min_int_32 <= r.lower <= _max_int_32) and (
1030 r.upper is None or _min_int_32 <= r.upper <= _max_int_32
1031 )