Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/csv.py: 23%
260 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-20 07:00 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-20 07:00 +0000
2"""
3csv.py - read/write/investigate CSV files
4"""
6import re
7from _csv import Error, __version__, writer, reader, register_dialect, \
8 unregister_dialect, get_dialect, list_dialects, \
9 field_size_limit, \
10 QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE, \
11 __doc__
12from _csv import Dialect as _Dialect
14from io import StringIO
16__all__ = ["QUOTE_MINIMAL", "QUOTE_ALL", "QUOTE_NONNUMERIC", "QUOTE_NONE",
17 "Error", "Dialect", "__doc__", "excel", "excel_tab",
18 "field_size_limit", "reader", "writer",
19 "register_dialect", "get_dialect", "list_dialects", "Sniffer",
20 "unregister_dialect", "__version__", "DictReader", "DictWriter",
21 "unix_dialect"]
23class Dialect:
24 """Describe a CSV dialect.
26 This must be subclassed (see csv.excel). Valid attributes are:
27 delimiter, quotechar, escapechar, doublequote, skipinitialspace,
28 lineterminator, quoting.
30 """
31 _name = ""
32 _valid = False
33 # placeholders
34 delimiter = None
35 quotechar = None
36 escapechar = None
37 doublequote = None
38 skipinitialspace = None
39 lineterminator = None
40 quoting = None
42 def __init__(self):
43 if self.__class__ != Dialect:
44 self._valid = True
45 self._validate()
47 def _validate(self):
48 try:
49 _Dialect(self)
50 except TypeError as e:
51 # We do this for compatibility with py2.3
52 raise Error(str(e))
54class excel(Dialect):
55 """Describe the usual properties of Excel-generated CSV files."""
56 delimiter = ','
57 quotechar = '"'
58 doublequote = True
59 skipinitialspace = False
60 lineterminator = '\r\n'
61 quoting = QUOTE_MINIMAL
62register_dialect("excel", excel)
64class excel_tab(excel):
65 """Describe the usual properties of Excel-generated TAB-delimited files."""
66 delimiter = '\t'
67register_dialect("excel-tab", excel_tab)
69class unix_dialect(Dialect):
70 """Describe the usual properties of Unix-generated CSV files."""
71 delimiter = ','
72 quotechar = '"'
73 doublequote = True
74 skipinitialspace = False
75 lineterminator = '\n'
76 quoting = QUOTE_ALL
77register_dialect("unix", unix_dialect)
80class DictReader:
81 def __init__(self, f, fieldnames=None, restkey=None, restval=None,
82 dialect="excel", *args, **kwds):
83 self._fieldnames = fieldnames # list of keys for the dict
84 self.restkey = restkey # key to catch long rows
85 self.restval = restval # default value for short rows
86 self.reader = reader(f, dialect, *args, **kwds)
87 self.dialect = dialect
88 self.line_num = 0
90 def __iter__(self):
91 return self
93 @property
94 def fieldnames(self):
95 if self._fieldnames is None:
96 try:
97 self._fieldnames = next(self.reader)
98 except StopIteration:
99 pass
100 self.line_num = self.reader.line_num
101 return self._fieldnames
103 @fieldnames.setter
104 def fieldnames(self, value):
105 self._fieldnames = value
107 def __next__(self):
108 if self.line_num == 0:
109 # Used only for its side effect.
110 self.fieldnames
111 row = next(self.reader)
112 self.line_num = self.reader.line_num
114 # unlike the basic reader, we prefer not to return blanks,
115 # because we will typically wind up with a dict full of None
116 # values
117 while row == []:
118 row = next(self.reader)
119 d = dict(zip(self.fieldnames, row))
120 lf = len(self.fieldnames)
121 lr = len(row)
122 if lf < lr:
123 d[self.restkey] = row[lf:]
124 elif lf > lr:
125 for key in self.fieldnames[lr:]:
126 d[key] = self.restval
127 return d
130class DictWriter:
131 def __init__(self, f, fieldnames, restval="", extrasaction="raise",
132 dialect="excel", *args, **kwds):
133 self.fieldnames = fieldnames # list of keys for the dict
134 self.restval = restval # for writing short dicts
135 if extrasaction.lower() not in ("raise", "ignore"):
136 raise ValueError("extrasaction (%s) must be 'raise' or 'ignore'"
137 % extrasaction)
138 self.extrasaction = extrasaction
139 self.writer = writer(f, dialect, *args, **kwds)
141 def writeheader(self):
142 header = dict(zip(self.fieldnames, self.fieldnames))
143 return self.writerow(header)
145 def _dict_to_list(self, rowdict):
146 if self.extrasaction == "raise":
147 wrong_fields = rowdict.keys() - self.fieldnames
148 if wrong_fields:
149 raise ValueError("dict contains fields not in fieldnames: "
150 + ", ".join([repr(x) for x in wrong_fields]))
151 return (rowdict.get(key, self.restval) for key in self.fieldnames)
153 def writerow(self, rowdict):
154 return self.writer.writerow(self._dict_to_list(rowdict))
156 def writerows(self, rowdicts):
157 return self.writer.writerows(map(self._dict_to_list, rowdicts))
159# Guard Sniffer's type checking against builds that exclude complex()
160try:
161 complex
162except NameError:
163 complex = float
165class Sniffer:
166 '''
167 "Sniffs" the format of a CSV file (i.e. delimiter, quotechar)
168 Returns a Dialect object.
169 '''
170 def __init__(self):
171 # in case there is more than one possible delimiter
172 self.preferred = [',', '\t', ';', ' ', ':']
175 def sniff(self, sample, delimiters=None):
176 """
177 Returns a dialect (or None) corresponding to the sample
178 """
180 quotechar, doublequote, delimiter, skipinitialspace = \
181 self._guess_quote_and_delimiter(sample, delimiters)
182 if not delimiter:
183 delimiter, skipinitialspace = self._guess_delimiter(sample,
184 delimiters)
186 if not delimiter:
187 raise Error("Could not determine delimiter")
189 class dialect(Dialect):
190 _name = "sniffed"
191 lineterminator = '\r\n'
192 quoting = QUOTE_MINIMAL
193 # escapechar = ''
195 dialect.doublequote = doublequote
196 dialect.delimiter = delimiter
197 # _csv.reader won't accept a quotechar of ''
198 dialect.quotechar = quotechar or '"'
199 dialect.skipinitialspace = skipinitialspace
201 return dialect
204 def _guess_quote_and_delimiter(self, data, delimiters):
205 """
206 Looks for text enclosed between two identical quotes
207 (the probable quotechar) which are preceded and followed
208 by the same character (the probable delimiter).
209 For example:
210 ,'some text',
211 The quote with the most wins, same with the delimiter.
212 If there is no quotechar the delimiter can't be determined
213 this way.
214 """
216 matches = []
217 for restr in (r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?P=delim)', # ,".*?",
218 r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?P<delim>[^\w\n"\'])(?P<space> ?)', # ".*?",
219 r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?:$|\n)', # ,".*?"
220 r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?:$|\n)'): # ".*?" (no delim, no space)
221 regexp = re.compile(restr, re.DOTALL | re.MULTILINE)
222 matches = regexp.findall(data)
223 if matches:
224 break
226 if not matches:
227 # (quotechar, doublequote, delimiter, skipinitialspace)
228 return ('', False, None, 0)
229 quotes = {}
230 delims = {}
231 spaces = 0
232 groupindex = regexp.groupindex
233 for m in matches:
234 n = groupindex['quote'] - 1
235 key = m[n]
236 if key:
237 quotes[key] = quotes.get(key, 0) + 1
238 try:
239 n = groupindex['delim'] - 1
240 key = m[n]
241 except KeyError:
242 continue
243 if key and (delimiters is None or key in delimiters):
244 delims[key] = delims.get(key, 0) + 1
245 try:
246 n = groupindex['space'] - 1
247 except KeyError:
248 continue
249 if m[n]:
250 spaces += 1
252 quotechar = max(quotes, key=quotes.get)
254 if delims:
255 delim = max(delims, key=delims.get)
256 skipinitialspace = delims[delim] == spaces
257 if delim == '\n': # most likely a file with a single column
258 delim = ''
259 else:
260 # there is *no* delimiter, it's a single column of quoted data
261 delim = ''
262 skipinitialspace = 0
264 # if we see an extra quote between delimiters, we've got a
265 # double quoted format
266 dq_regexp = re.compile(
267 r"((%(delim)s)|^)\W*%(quote)s[^%(delim)s\n]*%(quote)s[^%(delim)s\n]*%(quote)s\W*((%(delim)s)|$)" % \
268 {'delim':re.escape(delim), 'quote':quotechar}, re.MULTILINE)
272 if dq_regexp.search(data):
273 doublequote = True
274 else:
275 doublequote = False
277 return (quotechar, doublequote, delim, skipinitialspace)
280 def _guess_delimiter(self, data, delimiters):
281 """
282 The delimiter /should/ occur the same number of times on
283 each row. However, due to malformed data, it may not. We don't want
284 an all or nothing approach, so we allow for small variations in this
285 number.
286 1) build a table of the frequency of each character on every line.
287 2) build a table of frequencies of this frequency (meta-frequency?),
288 e.g. 'x occurred 5 times in 10 rows, 6 times in 1000 rows,
289 7 times in 2 rows'
290 3) use the mode of the meta-frequency to determine the /expected/
291 frequency for that character
292 4) find out how often the character actually meets that goal
293 5) the character that best meets its goal is the delimiter
294 For performance reasons, the data is evaluated in chunks, so it can
295 try and evaluate the smallest portion of the data possible, evaluating
296 additional chunks as necessary.
297 """
299 data = list(filter(None, data.split('\n')))
301 ascii = [chr(c) for c in range(127)] # 7-bit ASCII
303 # build frequency tables
304 chunkLength = min(10, len(data))
305 iteration = 0
306 charFrequency = {}
307 modes = {}
308 delims = {}
309 start, end = 0, chunkLength
310 while start < len(data):
311 iteration += 1
312 for line in data[start:end]:
313 for char in ascii:
314 metaFrequency = charFrequency.get(char, {})
315 # must count even if frequency is 0
316 freq = line.count(char)
317 # value is the mode
318 metaFrequency[freq] = metaFrequency.get(freq, 0) + 1
319 charFrequency[char] = metaFrequency
321 for char in charFrequency.keys():
322 items = list(charFrequency[char].items())
323 if len(items) == 1 and items[0][0] == 0:
324 continue
325 # get the mode of the frequencies
326 if len(items) > 1:
327 modes[char] = max(items, key=lambda x: x[1])
328 # adjust the mode - subtract the sum of all
329 # other frequencies
330 items.remove(modes[char])
331 modes[char] = (modes[char][0], modes[char][1]
332 - sum(item[1] for item in items))
333 else:
334 modes[char] = items[0]
336 # build a list of possible delimiters
337 modeList = modes.items()
338 total = float(min(chunkLength * iteration, len(data)))
339 # (rows of consistent data) / (number of rows) = 100%
340 consistency = 1.0
341 # minimum consistency threshold
342 threshold = 0.9
343 while len(delims) == 0 and consistency >= threshold:
344 for k, v in modeList:
345 if v[0] > 0 and v[1] > 0:
346 if ((v[1]/total) >= consistency and
347 (delimiters is None or k in delimiters)):
348 delims[k] = v
349 consistency -= 0.01
351 if len(delims) == 1:
352 delim = list(delims.keys())[0]
353 skipinitialspace = (data[0].count(delim) ==
354 data[0].count("%c " % delim))
355 return (delim, skipinitialspace)
357 # analyze another chunkLength lines
358 start = end
359 end += chunkLength
361 if not delims:
362 return ('', 0)
364 # if there's more than one, fall back to a 'preferred' list
365 if len(delims) > 1:
366 for d in self.preferred:
367 if d in delims.keys():
368 skipinitialspace = (data[0].count(d) ==
369 data[0].count("%c " % d))
370 return (d, skipinitialspace)
372 # nothing else indicates a preference, pick the character that
373 # dominates(?)
374 items = [(v,k) for (k,v) in delims.items()]
375 items.sort()
376 delim = items[-1][1]
378 skipinitialspace = (data[0].count(delim) ==
379 data[0].count("%c " % delim))
380 return (delim, skipinitialspace)
383 def has_header(self, sample):
384 # Creates a dictionary of types of data in each column. If any
385 # column is of a single type (say, integers), *except* for the first
386 # row, then the first row is presumed to be labels. If the type
387 # can't be determined, it is assumed to be a string in which case
388 # the length of the string is the determining factor: if all of the
389 # rows except for the first are the same length, it's a header.
390 # Finally, a 'vote' is taken at the end for each column, adding or
391 # subtracting from the likelihood of the first row being a header.
393 rdr = reader(StringIO(sample), self.sniff(sample))
395 header = next(rdr) # assume first row is header
397 columns = len(header)
398 columnTypes = {}
399 for i in range(columns): columnTypes[i] = None
401 checked = 0
402 for row in rdr:
403 # arbitrary number of rows to check, to keep it sane
404 if checked > 20:
405 break
406 checked += 1
408 if len(row) != columns:
409 continue # skip rows that have irregular number of columns
411 for col in list(columnTypes.keys()):
413 for thisType in [int, float, complex]:
414 try:
415 thisType(row[col])
416 break
417 except (ValueError, OverflowError):
418 pass
419 else:
420 # fallback to length of string
421 thisType = len(row[col])
423 if thisType != columnTypes[col]:
424 if columnTypes[col] is None: # add new column type
425 columnTypes[col] = thisType
426 else:
427 # type is inconsistent, remove column from
428 # consideration
429 del columnTypes[col]
431 # finally, compare results against first row and "vote"
432 # on whether it's a header
433 hasHeader = 0
434 for col, colType in columnTypes.items():
435 if type(colType) == type(0): # it's a length
436 if len(header[col]) != colType:
437 hasHeader += 1
438 else:
439 hasHeader -= 1
440 else: # attempt typecast
441 try:
442 colType(header[col])
443 except (ValueError, TypeError):
444 hasHeader += 1
445 else:
446 hasHeader -= 1
448 return hasHeader > 0