Coverage for blind_charging/person.py: 83%

266 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-02-17 20:36 +0000

1import re 

2from collections import defaultdict 

3from typing import DefaultDict, List, Set 

4 

5from similarity.damerau import Damerau 

6from similarity.jarowinkler import JaroWinkler 

7 

8from .individual import Individual, MergeDifferentPersonsError 

9from .locale import Locale 

10from .mask_const import WEB_COLORS 

11from .source_text import nlp 

12 

13# for distance measure to measure name 

14damerau = Damerau() 

15jarowinkler = JaroWinkler() 

16 

17 

18def _name_match(s1: Set[str], s2: Set[str], max_dist: int = 0) -> bool: 

19 """ 

20 detect if two sets share the same name 

21 max_dist is the max edit distance 

22 if max_dist in (0, 1), use normalized edit dist 

23 

24 param s1: set of names 

25 param s2: another set of names 

26 returns: True if s1 and s2 share a common name 

27 """ 

28 # TODO (acw): Try Jaro-Winkler here instead of levenshtein, which 

29 # seems too rigid 

30 if max_dist == 0: 

31 return bool(s1 & s2) 

32 else: 

33 for a in s1: 

34 for b in s2: 

35 dist = ( 

36 damerau.distance(a, b) 

37 if max_dist >= 1 

38 else jarowinkler.distance(a, b) 

39 ) 

40 if dist <= max_dist: 

41 return True 

42 return False 

43 

44 

45def add_compound_name_parts(name_set=Set[str]) -> Set[str]: 

46 # for each name in set 

47 # if hyphenated (space), add the following to set 

48 # - word before hyphen (space) 

49 # - word after hyphen (space) 

50 # - words concatenated together without hyphen (space) 

51 # - words concatenated together with space instead of hyphen (hyphen instead of space) 

52 hyphen_pattern = re.compile(r"\w+-\w+") 

53 space_pattern = re.compile(r"\w+\s\w+") 

54 new_name_set = name_set.copy() 

55 for name_part in name_set: 

56 if hyphen_pattern.match(name_part): 

57 new_name_set.update(name_part.split("-")) 

58 new_name_set.add(name_part.replace("-", "")) 

59 new_name_set.add(name_part.replace("-", " ")) 

60 if space_pattern.match(name_part): 

61 new_name_set.update(name_part.split()) 

62 new_name_set.add(re.sub(r"\s", "", name_part)) 

63 new_name_set.add(re.sub(r"\s", "-", name_part)) 

64 

65 return new_name_set 

66 

67 

68class PersonName(Individual): 

69 def __init__( 

70 self, 

71 indicator=None, 

72 report_id=None, 

73 name=None, 

74 f_name=None, 

75 m_name=None, 

76 l_name=None, 

77 alias=None, 

78 sfno=None, 

79 court_no=None, 

80 custom_label=None, 

81 ): 

82 # Store input arguments for serialization 

83 self._dict = { 

84 "indicator": indicator, 

85 "report_id": report_id, 

86 "name": name, 

87 "f_name": f_name, 

88 "m_name": m_name, 

89 "l_name": l_name, 

90 "alias": alias, 

91 "sfno": sfno, 

92 "court_no": court_no, 

93 "custom_label": custom_label, 

94 } 

95 

96 # Parse args 

97 ptype = None if indicator is None else re.sub("[^A-Z]+", "", indicator) 

98 if ptype == "": 

99 ptype = None 

100 pnum = None if indicator is None else re.sub("[^0-9]+", "", indicator) 

101 if pnum == "": 

102 pnum = None 

103 if ptype and pnum: 

104 self.indicator = ptype + pnum 

105 else: 

106 self.indicator = None 

107 self.custom_label = custom_label 

108 self.id_triplet = {(report_id, ptype, pnum)} 

109 self.sfno = sfno 

110 self.court_no = court_no 

111 self.code_name = None # to be filled later during dedup 

112 self.cls = "" 

113 self.color = "" 

114 # make names sets in case there are multiple versions 

115 # (e.g., Mike vs. M.) 

116 self.full_code_name = None # see above 

117 self.first = set() 

118 self.middle = set() 

119 self.last = set() 

120 self.alias = set() 

121 self._name = name 

122 if alias is not None: 

123 self.alias.add(alias.upper()) 

124 if name is not None: 

125 self.parse_name(name) 

126 self.last = add_compound_name_parts(self.last) 

127 elif not (f_name is None and m_name is None and l_name is None): 

128 self.parse_full_name(f_name=f_name, m_name=m_name, l_name=l_name) 

129 else: 

130 pass 

131 

132 # Cache flag indicating if this person is unknown 

133 self._is_unknown = "UNKNOWN" in { 

134 str(f_name).upper(), 

135 str(m_name).upper(), 

136 str(l_name).upper(), 

137 } 

138 

139 def to_dict(self): 

140 """Return dictionary of input arguments. 

141 

142 The following is true: 

143 ``` 

144 pn1 = PersonName(...) 

145 d = pn.to_dict() 

146 pn2 == PersonName(**d) 

147 ``` 

148 """ 

149 return self._dict.copy() 

150 

151 def __str__(self): 

152 return str( 

153 { 

154 "code_name": self.code_name, 

155 "id_triplet": self.id_triplet, 

156 "first": self.first, 

157 "middle": self.middle, 

158 "last": self.last, 

159 } 

160 ) 

161 

162 def __repr__(self): 

163 return self.__str__() 

164 

165 def __hash__(self): 

166 return hash(str(self.id_triplet)) 

167 

168 def __eq__(self, other): 

169 # if we have two non-None equal person number or court number. 

170 # then it has to be the same person 

171 if self.court_no is not None and other.court_no is not None: 

172 if self.court_no == other.court_no: 

173 return True 

174 else: 

175 return False 

176 

177 if self.sfno is not None and other.sfno is not None: 

178 if self.sfno == other.sfno: 

179 return True 

180 else: 

181 return False 

182 

183 # otherwise match names 

184 if _name_match(self.last, other.last, 1): 

185 # after they match last name, 

186 # if they both have same first name, then match 

187 if len(self.first) > 0 and len(other.first) > 0: 

188 if _name_match(self.first, other.first, 1): 

189 return True 

190 else: 

191 # if no first name, last name only match is a match 

192 return True 

193 

194 # Match alias 

195 if self.alias & other.alias: 

196 return True 

197 

198 # Match name to alias 

199 for name in other.last.union(other.first): 

200 # We do not compare initials 

201 if len(name.strip(".")) > 1: 

202 if name in self.alias: 

203 return True 

204 for name in self.last.union(self.first): 

205 # We do not compare initials 

206 if len(name.strip(".")) > 1: 

207 if name in other.alias: 

208 return True 

209 

210 # Match indicators if no names 

211 if (not self.last and not self.first) or (not other.last and not other.first): 

212 if self.id_triplet == other.id_triplet: 

213 return True 

214 

215 return False 

216 

217 def is_chargeable(self): 

218 if self.sfno is not None: 

219 return True 

220 for _report_id, ptype, _pnum in self.id_triplet: 

221 if ptype in {"B", "C", "D", "S"}: 

222 return True 

223 return False 

224 

225 def is_unknown(self) -> bool: 

226 """Check whether person is unknown.""" 

227 return self._is_unknown 

228 

229 def get_indicator(self): 

230 if self.code_name is None: 

231 raise Exception("Code name not assigned") 

232 return self.code_name 

233 

234 def parse_full_name(self, f_name, m_name, l_name): 

235 f_name = None if not f_name else f_name.strip().strip(".").upper() 

236 m_name = None if not m_name else m_name.strip().strip(".").upper() 

237 l_name = None if not l_name else l_name.strip().strip(".").upper() 

238 

239 if f_name: 

240 self.first.add(f_name.upper()) 

241 for p in f_name.upper().split(): 

242 self.first.add(p) 

243 

244 if m_name: 

245 self.middle.add(m_name.upper()) 

246 

247 if l_name: 

248 self.last.add(l_name.upper()) 

249 for p in l_name.upper().split(): 

250 self.last.add(p) 

251 

252 def parse_name(self, name): 

253 parts = [p.strip().strip(".").upper() for p in name.split()] 

254 parts = [ 

255 p for p in parts if not nlp.vocab[p].is_stop and not re.match(r"^\W$", p) 

256 ] 

257 if len(parts) == 1: 

258 # Last name 

259 self.last.add(parts[0]) 

260 elif len(parts) == 2: 

261 if parts[0][-1] == ",": 

262 # last, first 

263 self.last.add(parts[0].strip(",")) 

264 self.first.add(parts[1]) 

265 else: 

266 # first last or last f 

267 # assume no first l. scenario 

268 if (len(parts[1]) == 1) or (len(parts[1]) == 2 and parts[1][1] == "."): 

269 # last f. or last f 

270 self.last.add(parts[0]) 

271 self.first.add(parts[1]) 

272 else: 

273 # first last 

274 self.first.add(parts[0]) 

275 self.last.add(parts[1]) 

276 elif len(parts) == 3: 

277 if parts[0][-1] == ",": 

278 # last, first m(iddle) 

279 self.last.add(parts[0].strip(",")) 

280 self.first.add(parts[1]) 

281 self.middle.add(parts[2]) # middle 

282 elif parts[1][-1] == ",": 

283 # last last, first 

284 self.last.add(parts[0] + " " + parts[1].strip(",")) 

285 self.first.add(parts[2]) 

286 else: 

287 # first middle last 

288 self.first.add(parts[0]) 

289 self.middle.add(parts[1]) # middle 

290 self.last.add(parts[2]) 

291 else: 

292 # anything of length 4 or longer is likely an erroneous parse 

293 # treating it like a last name 

294 formatter_name = name.strip().strip(".").upper() 

295 if "," in formatter_name: 

296 parts = formatter_name.split(",") 

297 last = parts[0].strip() 

298 f = parts[1].strip() 

299 else: 

300 f = None 

301 last = formatter_name 

302 self.last.add(last) 

303 for part in last.split(): 

304 self.last.add(part) 

305 if f: 

306 self.first.add(f) 

307 for part in f.split(): 

308 self.first.add(part) 

309 

310 # remove any names of length 0 from sets 

311 self.first.discard("") 

312 self.middle.discard("") 

313 self.last.discard("") 

314 

315 def _name_rep_impl(self) -> List[str]: 

316 reps = set() 

317 

318 last_literals = [re.escape(last) for last in self.last] 

319 first_literals = [re.escape(f) for f in self.first] 

320 middle_literals = [re.escape(m) for m in self.middle] 

321 

322 for last in last_literals: 

323 reps.add(last) 

324 

325 for f in first_literals: 

326 reps.add(f) 

327 

328 for last in last_literals: 

329 for f in first_literals: 

330 reps.add(f + r"\s+" + last) # first last 

331 reps.add(f[0] + r"\s+" + last) # f. last 

332 reps.add(f[0] + r"\." + last) # f.last 

333 reps.add(f[0] + r"\.\s+" + last) # f last 

334 reps.add(last + r"\s*,\s+" + f) # last, first 

335 reps.add(last + r"\s+" + f[0]) # last f 

336 reps.add(last + r"\s+" + f[0] + r"\.") # last f. 

337 reps.add(last + r"\s*,\s+" + f[0]) # last, f 

338 reps.add(last + r"\s*,\s+" + f[0] + r"\.") # last, f. 

339 reps.add( 

340 last + r"\s+" + f 

341 ) # last first - for if name input is accidentally reversed 

342 

343 for f in first_literals: 

344 for m in middle_literals: 

345 for last in last_literals: 

346 reps.add(f + r"\s+" + m + r"\s+" + last) # first middle last 

347 reps.add(f + r"\s+" + m[0] + r"\s+" + last) # first m last 

348 reps.add(f + r"\s+" + m[0] + r"\.\s+" + last) # first m. last 

349 reps.add(last + r"\s*,\s+" + f + r"\s+" + m) # last, first middle 

350 reps.add(last + r"\s*,\s+" + f + r"\s+" + m[0]) # last, first m 

351 reps.add( 

352 last + r"\s*,\s+" + f + r"\s+" + m[0] + r"\." 

353 ) # last, first m. 

354 reps.add(m + r"\s+" + last) # middle last 

355 

356 reps = {r"%s\b" % x for x in reps} 

357 indicators = set[str]() 

358 indicator_reps = set[str]() 

359 if self.indicator: 

360 indicator_esc = re.escape(self.indicator) 

361 naked_ind = r"\W%s" % indicator_esc # RW1 

362 paren_ind = r"\(%s\)" % indicator_esc # (RW1) 

363 slash_base = re.sub(r"([A-Z|a-z])", r"\1/", indicator_esc) 

364 slash_ind = r"\W%s" % slash_base # R/W/1 

365 paren_slash_ind = r"\(%s\)" % slash_base # (R/W/1) 

366 middle_slash_base = re.sub( 

367 r"([A-Z|a-z])(?=[A-Z|a-z])", r"\1/", indicator_esc 

368 ) 

369 middle_slash_ind = r"\W%s" % middle_slash_base # R/W1 

370 paren_middle_slash_ind = r"\(%s\)" % middle_slash_base # (R/W1) 

371 indicators = indicators.union( 

372 { 

373 naked_ind, 

374 paren_ind, 

375 slash_ind, 

376 paren_slash_ind, 

377 middle_slash_ind, 

378 paren_middle_slash_ind, 

379 } 

380 ) 

381 

382 for indicator in indicators: 

383 for rep in reps: 

384 indicator_reps.add(r"%s\s*%s" % (indicator, rep)) 

385 indicator_reps.add(r"%s\s*%s" % (rep, indicator)) 

386 

387 reps = {r"\b%s" % x for x in reps} 

388 reps = reps.union(indicators).union(indicator_reps) 

389 

390 # the longest representation first for replacement purpose 

391 return sorted(reps, key=lambda x: len(x), reverse=True) 

392 

393 def merge(self, other): 

394 if self != other: 

395 raise MergeDifferentPersonsError() 

396 

397 if self.sfno is None: 

398 self.sfno = other.sfno 

399 self.id_triplet = self.id_triplet.union(other.id_triplet) 

400 self.first = self.first.union(other.first) 

401 self.middle = self.middle.union(other.middle) 

402 self.last = self.last.union(other.last) 

403 self.alias = self.alias.union(other.alias) 

404 

405 @classmethod 

406 def dedupe(cls, persons: List["PersonName"], locale: Locale) -> List["PersonName"]: 

407 """De-duplicate PersonName list. 

408 

409 :param persons: List of persons 

410 :param locale: Current location information 

411 :returns: De-duplicated list 

412 """ 

413 persons = super(PersonName, cls).dedupe(persons, locale) 

414 

415 REF_NAMES = locale.indicators 

416 

417 type_counts: DefaultDict[str, int] = defaultdict(int) 

418 count = 0 

419 for p in persons: 

420 # TODO(jnu): derive these values in a cleaner way 

421 p.cls = "masked-suspect" if p.is_chargeable() else "masked-person" 

422 p.color = WEB_COLORS[count % len(WEB_COLORS)] 

423 count += 1 

424 

425 # redact with custom label if present 

426 if p.custom_label: 

427 p.code_name = p.custom_label 

428 p.full_code_name = p.custom_label 

429 continue 

430 

431 code_name_parts = [] 

432 full_name_parts = [] 

433 ptype_set = {ptype for report_id, ptype, pnum in p.id_triplet} 

434 for ptype in ptype_set: 

435 if ptype is not None: 

436 cname = REF_NAMES[ptype] 

437 type_counts[cname] += 1 

438 code_name_parts.append(ptype + str(type_counts[cname])) 

439 full_name_parts.append(cname + " " + str(type_counts[cname])) 

440 # if no code name found for current person type, use person 

441 if not code_name_parts: 

442 # Use default value from defaultdict, 

443 # which is supposed to be person 

444 cname = REF_NAMES[""] 

445 type_counts[cname] += 1 

446 code_name_parts.append("PERSON_" + str(type_counts[cname])) 

447 full_name_parts.append(cname + " " + str(type_counts[cname])) 

448 

449 # NOTE(jnu): deterministic order for code name 

450 p.code_name = "(%s)" % " / ".join(sorted(code_name_parts)) 

451 p.full_code_name = " / ".join(sorted(full_name_parts)) 

452 

453 return persons