Coverage for blind_charging/person.py: 83%
266 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-02-17 20:36 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-02-17 20:36 +0000
1import re
2from collections import defaultdict
3from typing import DefaultDict, List, Set
5from similarity.damerau import Damerau
6from similarity.jarowinkler import JaroWinkler
8from .individual import Individual, MergeDifferentPersonsError
9from .locale import Locale
10from .mask_const import WEB_COLORS
11from .source_text import nlp
13# for distance measure to measure name
14damerau = Damerau()
15jarowinkler = JaroWinkler()
18def _name_match(s1: Set[str], s2: Set[str], max_dist: int = 0) -> bool:
19 """
20 detect if two sets share the same name
21 max_dist is the max edit distance
22 if max_dist in (0, 1), use normalized edit dist
24 param s1: set of names
25 param s2: another set of names
26 returns: True if s1 and s2 share a common name
27 """
28 # TODO (acw): Try Jaro-Winkler here instead of levenshtein, which
29 # seems too rigid
30 if max_dist == 0:
31 return bool(s1 & s2)
32 else:
33 for a in s1:
34 for b in s2:
35 dist = (
36 damerau.distance(a, b)
37 if max_dist >= 1
38 else jarowinkler.distance(a, b)
39 )
40 if dist <= max_dist:
41 return True
42 return False
45def add_compound_name_parts(name_set=Set[str]) -> Set[str]:
46 # for each name in set
47 # if hyphenated (space), add the following to set
48 # - word before hyphen (space)
49 # - word after hyphen (space)
50 # - words concatenated together without hyphen (space)
51 # - words concatenated together with space instead of hyphen (hyphen instead of space)
52 hyphen_pattern = re.compile(r"\w+-\w+")
53 space_pattern = re.compile(r"\w+\s\w+")
54 new_name_set = name_set.copy()
55 for name_part in name_set:
56 if hyphen_pattern.match(name_part):
57 new_name_set.update(name_part.split("-"))
58 new_name_set.add(name_part.replace("-", ""))
59 new_name_set.add(name_part.replace("-", " "))
60 if space_pattern.match(name_part):
61 new_name_set.update(name_part.split())
62 new_name_set.add(re.sub(r"\s", "", name_part))
63 new_name_set.add(re.sub(r"\s", "-", name_part))
65 return new_name_set
68class PersonName(Individual):
69 def __init__(
70 self,
71 indicator=None,
72 report_id=None,
73 name=None,
74 f_name=None,
75 m_name=None,
76 l_name=None,
77 alias=None,
78 sfno=None,
79 court_no=None,
80 custom_label=None,
81 ):
82 # Store input arguments for serialization
83 self._dict = {
84 "indicator": indicator,
85 "report_id": report_id,
86 "name": name,
87 "f_name": f_name,
88 "m_name": m_name,
89 "l_name": l_name,
90 "alias": alias,
91 "sfno": sfno,
92 "court_no": court_no,
93 "custom_label": custom_label,
94 }
96 # Parse args
97 ptype = None if indicator is None else re.sub("[^A-Z]+", "", indicator)
98 if ptype == "":
99 ptype = None
100 pnum = None if indicator is None else re.sub("[^0-9]+", "", indicator)
101 if pnum == "":
102 pnum = None
103 if ptype and pnum:
104 self.indicator = ptype + pnum
105 else:
106 self.indicator = None
107 self.custom_label = custom_label
108 self.id_triplet = {(report_id, ptype, pnum)}
109 self.sfno = sfno
110 self.court_no = court_no
111 self.code_name = None # to be filled later during dedup
112 self.cls = ""
113 self.color = ""
114 # make names sets in case there are multiple versions
115 # (e.g., Mike vs. M.)
116 self.full_code_name = None # see above
117 self.first = set()
118 self.middle = set()
119 self.last = set()
120 self.alias = set()
121 self._name = name
122 if alias is not None:
123 self.alias.add(alias.upper())
124 if name is not None:
125 self.parse_name(name)
126 self.last = add_compound_name_parts(self.last)
127 elif not (f_name is None and m_name is None and l_name is None):
128 self.parse_full_name(f_name=f_name, m_name=m_name, l_name=l_name)
129 else:
130 pass
132 # Cache flag indicating if this person is unknown
133 self._is_unknown = "UNKNOWN" in {
134 str(f_name).upper(),
135 str(m_name).upper(),
136 str(l_name).upper(),
137 }
139 def to_dict(self):
140 """Return dictionary of input arguments.
142 The following is true:
143 ```
144 pn1 = PersonName(...)
145 d = pn.to_dict()
146 pn2 == PersonName(**d)
147 ```
148 """
149 return self._dict.copy()
151 def __str__(self):
152 return str(
153 {
154 "code_name": self.code_name,
155 "id_triplet": self.id_triplet,
156 "first": self.first,
157 "middle": self.middle,
158 "last": self.last,
159 }
160 )
162 def __repr__(self):
163 return self.__str__()
165 def __hash__(self):
166 return hash(str(self.id_triplet))
168 def __eq__(self, other):
169 # if we have two non-None equal person number or court number.
170 # then it has to be the same person
171 if self.court_no is not None and other.court_no is not None:
172 if self.court_no == other.court_no:
173 return True
174 else:
175 return False
177 if self.sfno is not None and other.sfno is not None:
178 if self.sfno == other.sfno:
179 return True
180 else:
181 return False
183 # otherwise match names
184 if _name_match(self.last, other.last, 1):
185 # after they match last name,
186 # if they both have same first name, then match
187 if len(self.first) > 0 and len(other.first) > 0:
188 if _name_match(self.first, other.first, 1):
189 return True
190 else:
191 # if no first name, last name only match is a match
192 return True
194 # Match alias
195 if self.alias & other.alias:
196 return True
198 # Match name to alias
199 for name in other.last.union(other.first):
200 # We do not compare initials
201 if len(name.strip(".")) > 1:
202 if name in self.alias:
203 return True
204 for name in self.last.union(self.first):
205 # We do not compare initials
206 if len(name.strip(".")) > 1:
207 if name in other.alias:
208 return True
210 # Match indicators if no names
211 if (not self.last and not self.first) or (not other.last and not other.first):
212 if self.id_triplet == other.id_triplet:
213 return True
215 return False
217 def is_chargeable(self):
218 if self.sfno is not None:
219 return True
220 for _report_id, ptype, _pnum in self.id_triplet:
221 if ptype in {"B", "C", "D", "S"}:
222 return True
223 return False
225 def is_unknown(self) -> bool:
226 """Check whether person is unknown."""
227 return self._is_unknown
229 def get_indicator(self):
230 if self.code_name is None:
231 raise Exception("Code name not assigned")
232 return self.code_name
234 def parse_full_name(self, f_name, m_name, l_name):
235 f_name = None if not f_name else f_name.strip().strip(".").upper()
236 m_name = None if not m_name else m_name.strip().strip(".").upper()
237 l_name = None if not l_name else l_name.strip().strip(".").upper()
239 if f_name:
240 self.first.add(f_name.upper())
241 for p in f_name.upper().split():
242 self.first.add(p)
244 if m_name:
245 self.middle.add(m_name.upper())
247 if l_name:
248 self.last.add(l_name.upper())
249 for p in l_name.upper().split():
250 self.last.add(p)
252 def parse_name(self, name):
253 parts = [p.strip().strip(".").upper() for p in name.split()]
254 parts = [
255 p for p in parts if not nlp.vocab[p].is_stop and not re.match(r"^\W$", p)
256 ]
257 if len(parts) == 1:
258 # Last name
259 self.last.add(parts[0])
260 elif len(parts) == 2:
261 if parts[0][-1] == ",":
262 # last, first
263 self.last.add(parts[0].strip(","))
264 self.first.add(parts[1])
265 else:
266 # first last or last f
267 # assume no first l. scenario
268 if (len(parts[1]) == 1) or (len(parts[1]) == 2 and parts[1][1] == "."):
269 # last f. or last f
270 self.last.add(parts[0])
271 self.first.add(parts[1])
272 else:
273 # first last
274 self.first.add(parts[0])
275 self.last.add(parts[1])
276 elif len(parts) == 3:
277 if parts[0][-1] == ",":
278 # last, first m(iddle)
279 self.last.add(parts[0].strip(","))
280 self.first.add(parts[1])
281 self.middle.add(parts[2]) # middle
282 elif parts[1][-1] == ",":
283 # last last, first
284 self.last.add(parts[0] + " " + parts[1].strip(","))
285 self.first.add(parts[2])
286 else:
287 # first middle last
288 self.first.add(parts[0])
289 self.middle.add(parts[1]) # middle
290 self.last.add(parts[2])
291 else:
292 # anything of length 4 or longer is likely an erroneous parse
293 # treating it like a last name
294 formatter_name = name.strip().strip(".").upper()
295 if "," in formatter_name:
296 parts = formatter_name.split(",")
297 last = parts[0].strip()
298 f = parts[1].strip()
299 else:
300 f = None
301 last = formatter_name
302 self.last.add(last)
303 for part in last.split():
304 self.last.add(part)
305 if f:
306 self.first.add(f)
307 for part in f.split():
308 self.first.add(part)
310 # remove any names of length 0 from sets
311 self.first.discard("")
312 self.middle.discard("")
313 self.last.discard("")
315 def _name_rep_impl(self) -> List[str]:
316 reps = set()
318 last_literals = [re.escape(last) for last in self.last]
319 first_literals = [re.escape(f) for f in self.first]
320 middle_literals = [re.escape(m) for m in self.middle]
322 for last in last_literals:
323 reps.add(last)
325 for f in first_literals:
326 reps.add(f)
328 for last in last_literals:
329 for f in first_literals:
330 reps.add(f + r"\s+" + last) # first last
331 reps.add(f[0] + r"\s+" + last) # f. last
332 reps.add(f[0] + r"\." + last) # f.last
333 reps.add(f[0] + r"\.\s+" + last) # f last
334 reps.add(last + r"\s*,\s+" + f) # last, first
335 reps.add(last + r"\s+" + f[0]) # last f
336 reps.add(last + r"\s+" + f[0] + r"\.") # last f.
337 reps.add(last + r"\s*,\s+" + f[0]) # last, f
338 reps.add(last + r"\s*,\s+" + f[0] + r"\.") # last, f.
339 reps.add(
340 last + r"\s+" + f
341 ) # last first - for if name input is accidentally reversed
343 for f in first_literals:
344 for m in middle_literals:
345 for last in last_literals:
346 reps.add(f + r"\s+" + m + r"\s+" + last) # first middle last
347 reps.add(f + r"\s+" + m[0] + r"\s+" + last) # first m last
348 reps.add(f + r"\s+" + m[0] + r"\.\s+" + last) # first m. last
349 reps.add(last + r"\s*,\s+" + f + r"\s+" + m) # last, first middle
350 reps.add(last + r"\s*,\s+" + f + r"\s+" + m[0]) # last, first m
351 reps.add(
352 last + r"\s*,\s+" + f + r"\s+" + m[0] + r"\."
353 ) # last, first m.
354 reps.add(m + r"\s+" + last) # middle last
356 reps = {r"%s\b" % x for x in reps}
357 indicators = set[str]()
358 indicator_reps = set[str]()
359 if self.indicator:
360 indicator_esc = re.escape(self.indicator)
361 naked_ind = r"\W%s" % indicator_esc # RW1
362 paren_ind = r"\(%s\)" % indicator_esc # (RW1)
363 slash_base = re.sub(r"([A-Z|a-z])", r"\1/", indicator_esc)
364 slash_ind = r"\W%s" % slash_base # R/W/1
365 paren_slash_ind = r"\(%s\)" % slash_base # (R/W/1)
366 middle_slash_base = re.sub(
367 r"([A-Z|a-z])(?=[A-Z|a-z])", r"\1/", indicator_esc
368 )
369 middle_slash_ind = r"\W%s" % middle_slash_base # R/W1
370 paren_middle_slash_ind = r"\(%s\)" % middle_slash_base # (R/W1)
371 indicators = indicators.union(
372 {
373 naked_ind,
374 paren_ind,
375 slash_ind,
376 paren_slash_ind,
377 middle_slash_ind,
378 paren_middle_slash_ind,
379 }
380 )
382 for indicator in indicators:
383 for rep in reps:
384 indicator_reps.add(r"%s\s*%s" % (indicator, rep))
385 indicator_reps.add(r"%s\s*%s" % (rep, indicator))
387 reps = {r"\b%s" % x for x in reps}
388 reps = reps.union(indicators).union(indicator_reps)
390 # the longest representation first for replacement purpose
391 return sorted(reps, key=lambda x: len(x), reverse=True)
393 def merge(self, other):
394 if self != other:
395 raise MergeDifferentPersonsError()
397 if self.sfno is None:
398 self.sfno = other.sfno
399 self.id_triplet = self.id_triplet.union(other.id_triplet)
400 self.first = self.first.union(other.first)
401 self.middle = self.middle.union(other.middle)
402 self.last = self.last.union(other.last)
403 self.alias = self.alias.union(other.alias)
405 @classmethod
406 def dedupe(cls, persons: List["PersonName"], locale: Locale) -> List["PersonName"]:
407 """De-duplicate PersonName list.
409 :param persons: List of persons
410 :param locale: Current location information
411 :returns: De-duplicated list
412 """
413 persons = super(PersonName, cls).dedupe(persons, locale)
415 REF_NAMES = locale.indicators
417 type_counts: DefaultDict[str, int] = defaultdict(int)
418 count = 0
419 for p in persons:
420 # TODO(jnu): derive these values in a cleaner way
421 p.cls = "masked-suspect" if p.is_chargeable() else "masked-person"
422 p.color = WEB_COLORS[count % len(WEB_COLORS)]
423 count += 1
425 # redact with custom label if present
426 if p.custom_label:
427 p.code_name = p.custom_label
428 p.full_code_name = p.custom_label
429 continue
431 code_name_parts = []
432 full_name_parts = []
433 ptype_set = {ptype for report_id, ptype, pnum in p.id_triplet}
434 for ptype in ptype_set:
435 if ptype is not None:
436 cname = REF_NAMES[ptype]
437 type_counts[cname] += 1
438 code_name_parts.append(ptype + str(type_counts[cname]))
439 full_name_parts.append(cname + " " + str(type_counts[cname]))
440 # if no code name found for current person type, use person
441 if not code_name_parts:
442 # Use default value from defaultdict,
443 # which is supposed to be person
444 cname = REF_NAMES[""]
445 type_counts[cname] += 1
446 code_name_parts.append("PERSON_" + str(type_counts[cname]))
447 full_name_parts.append(cname + " " + str(type_counts[cname]))
449 # NOTE(jnu): deterministic order for code name
450 p.code_name = "(%s)" % " / ".join(sorted(code_name_parts))
451 p.full_code_name = " / ".join(sorted(full_name_parts))
453 return persons