Coverage for blind_charging/source_text.py: 87%

108 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-02-17 20:36 +0000

1"""Text container with utilities for applying redactions.""" 

2import os 

3import re 

4from typing import List, Optional, Tuple 

5 

6import spacy 

7from spacy.tokens import Doc, Span, Token 

8 

9from .annotation import Redaction 

10from .broken_range import BrokenRange 

11from .thunk import Thunk 

12 

13# The model can be swapped out at runtime by providing a path to a package. 

14# NOTE(jnu): lazy-load NLP model so app CLI methods can work regardless of 

15# environment config. 

16nlp = Thunk(lambda: spacy.load(os.getenv("BC_NLP_MODEL", "en_core_web_lg"))) 

17 

18 

19# Punctuation tokens that can end sentences. 

20_TERMINALS = {".", "!", "?", '"'} 

21 

22 

23class OverlapError(Exception): 

24 """Error raised when trying to overwrite an existing redaction.""" 

25 

26 

27def _last_non_space(span: Span) -> Optional[Token]: 

28 """Find the last non-space token in a span. 

29 

30 :param sent: Input span 

31 :returns: Non-space token if found, otherwise None 

32 """ 

33 for t in list(span)[::-1]: 

34 if t.pos_ != "SPACE": 

35 return t 

36 return None 

37 

38 

39def _capitalize(text: str) -> str: 

40 """Capitalize some text that might contain non-alphabetic characters. 

41 

42 E.g., "[placeholder]" -> "[Placeholder]" 

43 

44 :param text: Text to capitalize 

45 :returns: Capitalized text 

46 """ 

47 for i, c in enumerate(text): 

48 if c.isalpha(): 

49 return text[:i] + c.upper() + text[i + 1 :] 

50 return text 

51 

52 

53def _is_sent_start(doc: Doc, index: int) -> bool: 

54 """Test whether the given index is in the first token in a sentence. 

55 

56 :param doc: Spacy Doc 

57 :param start: Index to check 

58 :returns: True if index is within first word of a sentence 

59 """ 

60 seen_space = False 

61 # Limit search space to previous few tokens 

62 end_ptr = max(0, index - 3) 

63 while index >= end_ptr: 

64 char = doc.text[index] 

65 span = doc.char_span(index, index + 1) 

66 if span: 

67 tok = doc[span.start] 

68 if tok.pos_ == "PUNCT" and char in _TERMINALS: 

69 # Found punctuation: check if it's the last token in a 

70 # sentence. If it is, the initial word was the start of its 

71 # own sentence. 

72 return _last_non_space(tok.sent) == tok 

73 if re.match(r"\s", char): 

74 # Found a space 

75 seen_space = True 

76 elif seen_space: 

77 # Found another word: can't be beginning of the sentence. 

78 return False 

79 index -= 1 

80 

81 # Found the beginning of the string: must be start of sentence. 

82 return index == 0 

83 

84 

85def _clamp_to_word_boundary(doc: Doc, index: int, up: bool = True) -> int: 

86 """Move index to nearest word boundary if it points to a space. 

87 

88 :param doc: Spacy Document 

89 :param index: Start index 

90 :param up: Direction of clamp (default up; pass False to move index down) 

91 :returns: Clamped index 

92 """ 

93 delta = 1 if up else -1 

94 txt = doc.text 

95 while index > 0 and index < len(txt) - 1 and re.match(r"\s", txt[index]): 

96 index += delta 

97 return index 

98 

99 

100def _get_indefinite_article_for_text(text: str) -> str: 

101 """Get the indefinite article to use for the given text. 

102 

103 Uses heuristics based on word-initial orthography. Not completely accurate. 

104 

105 :param text: Input text 

106 :returns: "a" or "an" 

107 """ 

108 needs_epenthesis = False 

109 for _, c in enumerate(text): 

110 if c.isalpha(): 

111 # TODO(jnu): the rules are more complicated, but use simple vowel 

112 # orthography to catch most cases. 

113 # (Words like "one" fail to follow this rule.) 

114 needs_epenthesis = c.lower() in {"a", "e", "i", "o", "u"} 

115 break 

116 elif c.isdigit(): 

117 # TODO(jnu): Again, not a perfect rule, but good enough to start. 

118 # Catches things like "An 8-digit number" 

119 needs_epenthesis = c == "8" 

120 break 

121 return "an" if needs_epenthesis else "a" 

122 

123 

124def _correct_indef_article(doc: Doc, text: str, index: int) -> Tuple[str, int]: 

125 """Expand redaction to encompass the indefinite article, if necessary. 

126 

127 E.g., "an African-American male" -> "a [race/ethnicity] male" 

128 

129 If the preceding word is an indefinite article, we include it in the 

130 redacted text, replaced with the correct definite article for the 

131 substituted text. Do this because the epenthetic 'n' of the article could 

132 give away information about the underlying text, and also to make the 

133 text read more smoothly. 

134 

135 :param doc: Spacy Document 

136 :param text: Text to substitute as redaction 

137 :param index: Index where text will be inserted 

138 :returns: Tuple containing correct redaction text and annotation start 

139 index (which redacts the article as well). 

140 """ 

141 correct_article = _get_indefinite_article_for_text(text) 

142 scanned_words: List[str] = [] 

143 word_terminal_index: List[int] = [] 

144 space_str = "" 

145 scanning_word = False 

146 

147 ptr = index 

148 # At max we only need to search 4 tokens previous to this one. 

149 end_ptr = max(0, ptr - 4) 

150 

151 while ptr >= end_ptr: 

152 char = doc.text[ptr] 

153 # NOTE: Only match simple spaces. Newlines and tabs will probably 

154 # result in overmatching. 

155 if char == " ": 

156 # Break if we found two words. Note the loop will automatically 

157 # break when we reach the 0 index. 

158 if len(scanned_words) == 2: 

159 break 

160 space_str = char + space_str 

161 scanning_word = False 

162 else: 

163 if not scanning_word: 

164 scanning_word = True 

165 scanned_words.append("") 

166 word_terminal_index.append(ptr + 1) 

167 scanned_words[-1] = char + scanned_words[-1] 

168 ptr -= 1 

169 

170 # If the preceding word was not the indefinite article, return 

171 if len(scanned_words) < 2 or scanned_words[-1].lower() not in {"a", "an"}: 

172 return text, index 

173 

174 # Match case when substituting article 

175 existing_article = scanned_words[-1] 

176 if existing_article.isupper(): 

177 correct_article = correct_article.upper() 

178 elif existing_article[0].isupper(): 

179 correct_article = correct_article.capitalize() 

180 

181 new_text = correct_article + space_str + text 

182 new_idx = word_terminal_index[-1] - len(scanned_words[-1]) 

183 return new_text, new_idx 

184 

185 

186class SourceText(object): 

187 """A stateful container for text undergoing redaction. 

188 

189 This container supplies a stateful representation of the text while 

190 redaction is in progress, so that redactions can be applied in priority 

191 order without later rules re-matching text that has already been redacted. 

192 

193 The container also provides NLP entities based on the original source text 

194 which are always available to rules, regardless of order. 

195 """ 

196 

197 def __init__(self, text: str): 

198 self.text = text 

199 self.nlp = nlp(text) 

200 self.cleared = BrokenRange() 

201 

202 def clear_span(self, start: int, end: int, placeholder="*"): 

203 """Clear a span in the source text while preserving the text length. 

204 

205 :param start: Start of extent to clear 

206 :param end: End of extent to clear 

207 :param placeholder: Placeholder character to use in span 

208 """ 

209 extent = end - start 

210 # Generate a new replacement span, but ensure that the length is 

211 # correct. This deals with the case that the input placeholder was 

212 # longer than one character. 

213 new_span = (placeholder * extent)[:extent] 

214 self.text = self.text[:start] + new_span + self.text[end:] 

215 # Track the spans that have been redacted 

216 self.cleared.addspan(start, end) 

217 

218 def can_redact(self, start: int, end: int) -> bool: 

219 """Ensure that no part of the given span is already redacted. 

220 

221 :param start: Start position of the span 

222 :param end: End position of the span (inclusive) 

223 :returns: Boolean indicating whether span is eligible for redaction 

224 """ 

225 return not self.cleared.overlaps(start, end) 

226 

227 def redact( 

228 self, 

229 start: int, 

230 end: int, 

231 text: str, 

232 clamp: bool = True, 

233 auto_capitalize: bool = True, 

234 autocorrect_article: bool = True, 

235 force: bool = False, 

236 **kwargs: str 

237 ) -> Redaction: 

238 """Redact a span of text with the given replacement. 

239 

240 :param start: Start of span (position of first character in span) 

241 :param end: End of span (position last character in span) 

242 :param text: Replacement text 

243 :param clamp: Ensure the redaction fits neatly at word boundaries 

244 :param auto_capitalize: Infer and apply capitalization from underlying 

245 text span. 

246 :param autocorrect_article: Automatically modify redaction to account 

247 for any preceding indefinite article (i.e., "a" vs. "an") 

248 :param force: Don't throw an error if the span overlaps with a span 

249 that has already been redacted. 

250 :param **kwargs: Passed to Redaction constructor 

251 :returns: Redaction 

252 :raises OverlapError: If the suggested span would overlap with another 

253 existing redaction. 

254 """ 

255 if not force and not self.can_redact(start, end): 

256 raise OverlapError("Invalid span: {} - {}".format(start, end)) 

257 

258 if clamp: 

259 start = _clamp_to_word_boundary(self.nlp, start, up=True) 

260 # Correct for off-by-1 errors, since `end` technically points to 

261 # the character immediately following the redaction. 

262 end = _clamp_to_word_boundary(self.nlp, end - 1, up=False) + 1 

263 

264 if autocorrect_article: 

265 text, start = _correct_indef_article(self.nlp, text, start) 

266 

267 if auto_capitalize and _is_sent_start(self.nlp, start): 

268 text = _capitalize(text) 

269 

270 self.clear_span(start, end) 

271 return Redaction(start, end, text, **kwargs)