1from __future__ import annotations
2
3# built-in
4from collections import Counter
5from contextlib import suppress
6from typing import Sequence, TypeVar
7
8# app
9from ..libraries import prototype
10from ..utils import find_ngrams
11
12
13libraries = prototype.clone()
14libraries.optimize()
15T = TypeVar('T')
16
17
18class Base:
19 def __init__(self, qval: int = 1, external: bool = True) -> None:
20 self.qval = qval
21 self.external = external
22
23 def __call__(self, *sequences: Sequence[object]) -> float:
24 raise NotImplementedError
25
26 @staticmethod
27 def maximum(*sequences: Sequence[object]) -> float:
28 """Get maximum possible value
29 """
30 return max(map(len, sequences))
31
32 def distance(self, *sequences: Sequence[object]) -> float:
33 """Get distance between sequences
34 """
35 return self(*sequences)
36
37 def similarity(self, *sequences: Sequence[object]) -> float:
38 """Get sequences similarity.
39
40 similarity = maximum - distance
41 """
42 return self.maximum(*sequences) - self.distance(*sequences)
43
44 def normalized_distance(self, *sequences: Sequence[object]) -> float:
45 """Get distance from 0 to 1
46 """
47 maximum = self.maximum(*sequences)
48 if maximum == 0:
49 return 0
50 return self.distance(*sequences) / maximum
51
52 def normalized_similarity(self, *sequences: Sequence[object]) -> float:
53 """Get similarity from 0 to 1
54
55 normalized_similarity = 1 - normalized_distance
56 """
57 return 1 - self.normalized_distance(*sequences)
58
59 def external_answer(self, *sequences: Sequence[object]) -> float | None:
60 """Try to get answer from known external libraries.
61 """
62 # if this feature disabled
63 if not getattr(self, 'external', False):
64 return None
65 # all external libs don't support test_func
66 test_func = getattr(self, 'test_func', self._ident)
67 if test_func is not self._ident:
68 return None
69 # try to get external libs for algorithm
70 libs = libraries.get_libs(self.__class__.__name__)
71 for lib in libs:
72 # if conditions not satisfied
73 if not lib.check_conditions(self, *sequences):
74 continue
75 # if library is not installed yet
76 func = lib.get_function()
77 if func is None:
78 continue
79 prepared_sequences = lib.prepare(*sequences)
80 # fail side libraries silently and try next libs
81 with suppress(Exception):
82 return func(*prepared_sequences)
83 return None
84
85 def quick_answer(self, *sequences: Sequence[object]) -> float | None:
86 """Try to get answer quick without main implementation calling.
87
88 If no sequences, 1 sequence or all sequences are equal then return 0.
89 If any sequence are empty then return maximum.
90 And in finish try to get external answer.
91 """
92 if not sequences:
93 return 0
94 if len(sequences) == 1:
95 return 0
96 if self._ident(*sequences):
97 return 0
98 if not all(sequences):
99 return self.maximum(*sequences)
100 # try get answer from external libs
101 return self.external_answer(*sequences)
102
103 @staticmethod
104 def _ident(*elements: object) -> bool:
105 """Return True if all sequences are equal.
106 """
107 try:
108 # for hashable elements
109 return len(set(elements)) == 1
110 except TypeError:
111 # for unhashable elements
112 for e1, e2 in zip(elements, elements[1:]):
113 if e1 != e2:
114 return False
115 return True
116
117 def _get_sequences(self, *sequences: Sequence[object]) -> list:
118 """Prepare sequences.
119
120 qval=None: split text by words
121 qval=1: do not split sequences. For text this is mean comparing by letters.
122 qval>1: split sequences by q-grams
123 """
124 # by words
125 if not self.qval:
126 return [s.split() for s in sequences] # type: ignore[attr-defined]
127 # by chars
128 if self.qval == 1:
129 return list(sequences)
130 # by n-grams
131 return [find_ngrams(s, self.qval) for s in sequences]
132
133 def _get_counters(self, *sequences: Sequence[object]) -> list[Counter]:
134 """Prepare sequences and convert it to Counters.
135 """
136 # already Counters
137 if all(isinstance(s, Counter) for s in sequences):
138 return list(sequences) # type: ignore[arg-type]
139 return [Counter(s) for s in self._get_sequences(*sequences)]
140
141 def _intersect_counters(self, *sequences: Counter[T]) -> Counter[T]:
142 intersection = sequences[0].copy()
143 for s in sequences[1:]:
144 intersection &= s
145 return intersection
146
147 def _union_counters(self, *sequences: Counter[T]) -> Counter[T]:
148 union = sequences[0].copy()
149 for s in sequences[1:]:
150 union |= s
151 return union
152
153 def _sum_counters(self, *sequences: Counter[T]) -> Counter[T]:
154 result = sequences[0].copy()
155 for s in sequences[1:]:
156 result += s
157 return result
158
159 def _count_counters(self, counter: Counter) -> int:
160 """Return all elements count from Counter
161 """
162 if getattr(self, 'as_set', False):
163 return len(set(counter))
164 else:
165 return sum(counter.values())
166
167 def __repr__(self) -> str:
168 return '{name}({data})'.format(
169 name=type(self).__name__,
170 data=self.__dict__,
171 )
172
173
174class BaseSimilarity(Base):
175 def distance(self, *sequences: Sequence[object]) -> float:
176 return self.maximum(*sequences) - self.similarity(*sequences)
177
178 def similarity(self, *sequences: Sequence[object]) -> float:
179 return self(*sequences)
180
181 def quick_answer(self, *sequences: Sequence[object]) -> float | None:
182 if not sequences:
183 return self.maximum(*sequences)
184 if len(sequences) == 1:
185 return self.maximum(*sequences)
186 if self._ident(*sequences):
187 return self.maximum(*sequences)
188 if not all(sequences):
189 return 0
190 # try get answer from external libs
191 return self.external_answer(*sequences)