Coverage for blind_charging/source_text.py: 87%
108 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-02-17 20:36 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-02-17 20:36 +0000
1"""Text container with utilities for applying redactions."""
2import os
3import re
4from typing import List, Optional, Tuple
6import spacy
7from spacy.tokens import Doc, Span, Token
9from .annotation import Redaction
10from .broken_range import BrokenRange
11from .thunk import Thunk
13# The model can be swapped out at runtime by providing a path to a package.
14# NOTE(jnu): lazy-load NLP model so app CLI methods can work regardless of
15# environment config.
16nlp = Thunk(lambda: spacy.load(os.getenv("BC_NLP_MODEL", "en_core_web_lg")))
19# Punctuation tokens that can end sentences.
20_TERMINALS = {".", "!", "?", '"'}
23class OverlapError(Exception):
24 """Error raised when trying to overwrite an existing redaction."""
27def _last_non_space(span: Span) -> Optional[Token]:
28 """Find the last non-space token in a span.
30 :param sent: Input span
31 :returns: Non-space token if found, otherwise None
32 """
33 for t in list(span)[::-1]:
34 if t.pos_ != "SPACE":
35 return t
36 return None
39def _capitalize(text: str) -> str:
40 """Capitalize some text that might contain non-alphabetic characters.
42 E.g., "[placeholder]" -> "[Placeholder]"
44 :param text: Text to capitalize
45 :returns: Capitalized text
46 """
47 for i, c in enumerate(text):
48 if c.isalpha():
49 return text[:i] + c.upper() + text[i + 1 :]
50 return text
53def _is_sent_start(doc: Doc, index: int) -> bool:
54 """Test whether the given index is in the first token in a sentence.
56 :param doc: Spacy Doc
57 :param start: Index to check
58 :returns: True if index is within first word of a sentence
59 """
60 seen_space = False
61 # Limit search space to previous few tokens
62 end_ptr = max(0, index - 3)
63 while index >= end_ptr:
64 char = doc.text[index]
65 span = doc.char_span(index, index + 1)
66 if span:
67 tok = doc[span.start]
68 if tok.pos_ == "PUNCT" and char in _TERMINALS:
69 # Found punctuation: check if it's the last token in a
70 # sentence. If it is, the initial word was the start of its
71 # own sentence.
72 return _last_non_space(tok.sent) == tok
73 if re.match(r"\s", char):
74 # Found a space
75 seen_space = True
76 elif seen_space:
77 # Found another word: can't be beginning of the sentence.
78 return False
79 index -= 1
81 # Found the beginning of the string: must be start of sentence.
82 return index == 0
85def _clamp_to_word_boundary(doc: Doc, index: int, up: bool = True) -> int:
86 """Move index to nearest word boundary if it points to a space.
88 :param doc: Spacy Document
89 :param index: Start index
90 :param up: Direction of clamp (default up; pass False to move index down)
91 :returns: Clamped index
92 """
93 delta = 1 if up else -1
94 txt = doc.text
95 while index > 0 and index < len(txt) - 1 and re.match(r"\s", txt[index]):
96 index += delta
97 return index
100def _get_indefinite_article_for_text(text: str) -> str:
101 """Get the indefinite article to use for the given text.
103 Uses heuristics based on word-initial orthography. Not completely accurate.
105 :param text: Input text
106 :returns: "a" or "an"
107 """
108 needs_epenthesis = False
109 for _, c in enumerate(text):
110 if c.isalpha():
111 # TODO(jnu): the rules are more complicated, but use simple vowel
112 # orthography to catch most cases.
113 # (Words like "one" fail to follow this rule.)
114 needs_epenthesis = c.lower() in {"a", "e", "i", "o", "u"}
115 break
116 elif c.isdigit():
117 # TODO(jnu): Again, not a perfect rule, but good enough to start.
118 # Catches things like "An 8-digit number"
119 needs_epenthesis = c == "8"
120 break
121 return "an" if needs_epenthesis else "a"
124def _correct_indef_article(doc: Doc, text: str, index: int) -> Tuple[str, int]:
125 """Expand redaction to encompass the indefinite article, if necessary.
127 E.g., "an African-American male" -> "a [race/ethnicity] male"
129 If the preceding word is an indefinite article, we include it in the
130 redacted text, replaced with the correct definite article for the
131 substituted text. Do this because the epenthetic 'n' of the article could
132 give away information about the underlying text, and also to make the
133 text read more smoothly.
135 :param doc: Spacy Document
136 :param text: Text to substitute as redaction
137 :param index: Index where text will be inserted
138 :returns: Tuple containing correct redaction text and annotation start
139 index (which redacts the article as well).
140 """
141 correct_article = _get_indefinite_article_for_text(text)
142 scanned_words: List[str] = []
143 word_terminal_index: List[int] = []
144 space_str = ""
145 scanning_word = False
147 ptr = index
148 # At max we only need to search 4 tokens previous to this one.
149 end_ptr = max(0, ptr - 4)
151 while ptr >= end_ptr:
152 char = doc.text[ptr]
153 # NOTE: Only match simple spaces. Newlines and tabs will probably
154 # result in overmatching.
155 if char == " ":
156 # Break if we found two words. Note the loop will automatically
157 # break when we reach the 0 index.
158 if len(scanned_words) == 2:
159 break
160 space_str = char + space_str
161 scanning_word = False
162 else:
163 if not scanning_word:
164 scanning_word = True
165 scanned_words.append("")
166 word_terminal_index.append(ptr + 1)
167 scanned_words[-1] = char + scanned_words[-1]
168 ptr -= 1
170 # If the preceding word was not the indefinite article, return
171 if len(scanned_words) < 2 or scanned_words[-1].lower() not in {"a", "an"}:
172 return text, index
174 # Match case when substituting article
175 existing_article = scanned_words[-1]
176 if existing_article.isupper():
177 correct_article = correct_article.upper()
178 elif existing_article[0].isupper():
179 correct_article = correct_article.capitalize()
181 new_text = correct_article + space_str + text
182 new_idx = word_terminal_index[-1] - len(scanned_words[-1])
183 return new_text, new_idx
186class SourceText(object):
187 """A stateful container for text undergoing redaction.
189 This container supplies a stateful representation of the text while
190 redaction is in progress, so that redactions can be applied in priority
191 order without later rules re-matching text that has already been redacted.
193 The container also provides NLP entities based on the original source text
194 which are always available to rules, regardless of order.
195 """
197 def __init__(self, text: str):
198 self.text = text
199 self.nlp = nlp(text)
200 self.cleared = BrokenRange()
202 def clear_span(self, start: int, end: int, placeholder="*"):
203 """Clear a span in the source text while preserving the text length.
205 :param start: Start of extent to clear
206 :param end: End of extent to clear
207 :param placeholder: Placeholder character to use in span
208 """
209 extent = end - start
210 # Generate a new replacement span, but ensure that the length is
211 # correct. This deals with the case that the input placeholder was
212 # longer than one character.
213 new_span = (placeholder * extent)[:extent]
214 self.text = self.text[:start] + new_span + self.text[end:]
215 # Track the spans that have been redacted
216 self.cleared.addspan(start, end)
218 def can_redact(self, start: int, end: int) -> bool:
219 """Ensure that no part of the given span is already redacted.
221 :param start: Start position of the span
222 :param end: End position of the span (inclusive)
223 :returns: Boolean indicating whether span is eligible for redaction
224 """
225 return not self.cleared.overlaps(start, end)
227 def redact(
228 self,
229 start: int,
230 end: int,
231 text: str,
232 clamp: bool = True,
233 auto_capitalize: bool = True,
234 autocorrect_article: bool = True,
235 force: bool = False,
236 **kwargs: str
237 ) -> Redaction:
238 """Redact a span of text with the given replacement.
240 :param start: Start of span (position of first character in span)
241 :param end: End of span (position last character in span)
242 :param text: Replacement text
243 :param clamp: Ensure the redaction fits neatly at word boundaries
244 :param auto_capitalize: Infer and apply capitalization from underlying
245 text span.
246 :param autocorrect_article: Automatically modify redaction to account
247 for any preceding indefinite article (i.e., "a" vs. "an")
248 :param force: Don't throw an error if the span overlaps with a span
249 that has already been redacted.
250 :param **kwargs: Passed to Redaction constructor
251 :returns: Redaction
252 :raises OverlapError: If the suggested span would overlap with another
253 existing redaction.
254 """
255 if not force and not self.can_redact(start, end):
256 raise OverlapError("Invalid span: {} - {}".format(start, end))
258 if clamp:
259 start = _clamp_to_word_boundary(self.nlp, start, up=True)
260 # Correct for off-by-1 errors, since `end` technically points to
261 # the character immediately following the redaction.
262 end = _clamp_to_word_boundary(self.nlp, end - 1, up=False) + 1
264 if autocorrect_article:
265 text, start = _correct_indef_article(self.nlp, text, start)
267 if auto_capitalize and _is_sent_start(self.nlp, start):
268 text = _capitalize(text)
270 self.clear_span(start, end)
271 return Redaction(start, end, text, **kwargs)