Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/csv.py: 23%

260 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-20 07:00 +0000

1 

2""" 

3csv.py - read/write/investigate CSV files 

4""" 

5 

6import re 

7from _csv import Error, __version__, writer, reader, register_dialect, \ 

8 unregister_dialect, get_dialect, list_dialects, \ 

9 field_size_limit, \ 

10 QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE, \ 

11 __doc__ 

12from _csv import Dialect as _Dialect 

13 

14from io import StringIO 

15 

16__all__ = ["QUOTE_MINIMAL", "QUOTE_ALL", "QUOTE_NONNUMERIC", "QUOTE_NONE", 

17 "Error", "Dialect", "__doc__", "excel", "excel_tab", 

18 "field_size_limit", "reader", "writer", 

19 "register_dialect", "get_dialect", "list_dialects", "Sniffer", 

20 "unregister_dialect", "__version__", "DictReader", "DictWriter", 

21 "unix_dialect"] 

22 

23class Dialect: 

24 """Describe a CSV dialect. 

25 

26 This must be subclassed (see csv.excel). Valid attributes are: 

27 delimiter, quotechar, escapechar, doublequote, skipinitialspace, 

28 lineterminator, quoting. 

29 

30 """ 

31 _name = "" 

32 _valid = False 

33 # placeholders 

34 delimiter = None 

35 quotechar = None 

36 escapechar = None 

37 doublequote = None 

38 skipinitialspace = None 

39 lineterminator = None 

40 quoting = None 

41 

42 def __init__(self): 

43 if self.__class__ != Dialect: 

44 self._valid = True 

45 self._validate() 

46 

47 def _validate(self): 

48 try: 

49 _Dialect(self) 

50 except TypeError as e: 

51 # We do this for compatibility with py2.3 

52 raise Error(str(e)) 

53 

54class excel(Dialect): 

55 """Describe the usual properties of Excel-generated CSV files.""" 

56 delimiter = ',' 

57 quotechar = '"' 

58 doublequote = True 

59 skipinitialspace = False 

60 lineterminator = '\r\n' 

61 quoting = QUOTE_MINIMAL 

62register_dialect("excel", excel) 

63 

64class excel_tab(excel): 

65 """Describe the usual properties of Excel-generated TAB-delimited files.""" 

66 delimiter = '\t' 

67register_dialect("excel-tab", excel_tab) 

68 

69class unix_dialect(Dialect): 

70 """Describe the usual properties of Unix-generated CSV files.""" 

71 delimiter = ',' 

72 quotechar = '"' 

73 doublequote = True 

74 skipinitialspace = False 

75 lineterminator = '\n' 

76 quoting = QUOTE_ALL 

77register_dialect("unix", unix_dialect) 

78 

79 

80class DictReader: 

81 def __init__(self, f, fieldnames=None, restkey=None, restval=None, 

82 dialect="excel", *args, **kwds): 

83 self._fieldnames = fieldnames # list of keys for the dict 

84 self.restkey = restkey # key to catch long rows 

85 self.restval = restval # default value for short rows 

86 self.reader = reader(f, dialect, *args, **kwds) 

87 self.dialect = dialect 

88 self.line_num = 0 

89 

90 def __iter__(self): 

91 return self 

92 

93 @property 

94 def fieldnames(self): 

95 if self._fieldnames is None: 

96 try: 

97 self._fieldnames = next(self.reader) 

98 except StopIteration: 

99 pass 

100 self.line_num = self.reader.line_num 

101 return self._fieldnames 

102 

103 @fieldnames.setter 

104 def fieldnames(self, value): 

105 self._fieldnames = value 

106 

107 def __next__(self): 

108 if self.line_num == 0: 

109 # Used only for its side effect. 

110 self.fieldnames 

111 row = next(self.reader) 

112 self.line_num = self.reader.line_num 

113 

114 # unlike the basic reader, we prefer not to return blanks, 

115 # because we will typically wind up with a dict full of None 

116 # values 

117 while row == []: 

118 row = next(self.reader) 

119 d = dict(zip(self.fieldnames, row)) 

120 lf = len(self.fieldnames) 

121 lr = len(row) 

122 if lf < lr: 

123 d[self.restkey] = row[lf:] 

124 elif lf > lr: 

125 for key in self.fieldnames[lr:]: 

126 d[key] = self.restval 

127 return d 

128 

129 

130class DictWriter: 

131 def __init__(self, f, fieldnames, restval="", extrasaction="raise", 

132 dialect="excel", *args, **kwds): 

133 self.fieldnames = fieldnames # list of keys for the dict 

134 self.restval = restval # for writing short dicts 

135 if extrasaction.lower() not in ("raise", "ignore"): 

136 raise ValueError("extrasaction (%s) must be 'raise' or 'ignore'" 

137 % extrasaction) 

138 self.extrasaction = extrasaction 

139 self.writer = writer(f, dialect, *args, **kwds) 

140 

141 def writeheader(self): 

142 header = dict(zip(self.fieldnames, self.fieldnames)) 

143 return self.writerow(header) 

144 

145 def _dict_to_list(self, rowdict): 

146 if self.extrasaction == "raise": 

147 wrong_fields = rowdict.keys() - self.fieldnames 

148 if wrong_fields: 

149 raise ValueError("dict contains fields not in fieldnames: " 

150 + ", ".join([repr(x) for x in wrong_fields])) 

151 return (rowdict.get(key, self.restval) for key in self.fieldnames) 

152 

153 def writerow(self, rowdict): 

154 return self.writer.writerow(self._dict_to_list(rowdict)) 

155 

156 def writerows(self, rowdicts): 

157 return self.writer.writerows(map(self._dict_to_list, rowdicts)) 

158 

159# Guard Sniffer's type checking against builds that exclude complex() 

160try: 

161 complex 

162except NameError: 

163 complex = float 

164 

165class Sniffer: 

166 ''' 

167 "Sniffs" the format of a CSV file (i.e. delimiter, quotechar) 

168 Returns a Dialect object. 

169 ''' 

170 def __init__(self): 

171 # in case there is more than one possible delimiter 

172 self.preferred = [',', '\t', ';', ' ', ':'] 

173 

174 

175 def sniff(self, sample, delimiters=None): 

176 """ 

177 Returns a dialect (or None) corresponding to the sample 

178 """ 

179 

180 quotechar, doublequote, delimiter, skipinitialspace = \ 

181 self._guess_quote_and_delimiter(sample, delimiters) 

182 if not delimiter: 

183 delimiter, skipinitialspace = self._guess_delimiter(sample, 

184 delimiters) 

185 

186 if not delimiter: 

187 raise Error("Could not determine delimiter") 

188 

189 class dialect(Dialect): 

190 _name = "sniffed" 

191 lineterminator = '\r\n' 

192 quoting = QUOTE_MINIMAL 

193 # escapechar = '' 

194 

195 dialect.doublequote = doublequote 

196 dialect.delimiter = delimiter 

197 # _csv.reader won't accept a quotechar of '' 

198 dialect.quotechar = quotechar or '"' 

199 dialect.skipinitialspace = skipinitialspace 

200 

201 return dialect 

202 

203 

204 def _guess_quote_and_delimiter(self, data, delimiters): 

205 """ 

206 Looks for text enclosed between two identical quotes 

207 (the probable quotechar) which are preceded and followed 

208 by the same character (the probable delimiter). 

209 For example: 

210 ,'some text', 

211 The quote with the most wins, same with the delimiter. 

212 If there is no quotechar the delimiter can't be determined 

213 this way. 

214 """ 

215 

216 matches = [] 

217 for restr in (r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?P=delim)', # ,".*?", 

218 r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?P<delim>[^\w\n"\'])(?P<space> ?)', # ".*?", 

219 r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?:$|\n)', # ,".*?" 

220 r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?:$|\n)'): # ".*?" (no delim, no space) 

221 regexp = re.compile(restr, re.DOTALL | re.MULTILINE) 

222 matches = regexp.findall(data) 

223 if matches: 

224 break 

225 

226 if not matches: 

227 # (quotechar, doublequote, delimiter, skipinitialspace) 

228 return ('', False, None, 0) 

229 quotes = {} 

230 delims = {} 

231 spaces = 0 

232 groupindex = regexp.groupindex 

233 for m in matches: 

234 n = groupindex['quote'] - 1 

235 key = m[n] 

236 if key: 

237 quotes[key] = quotes.get(key, 0) + 1 

238 try: 

239 n = groupindex['delim'] - 1 

240 key = m[n] 

241 except KeyError: 

242 continue 

243 if key and (delimiters is None or key in delimiters): 

244 delims[key] = delims.get(key, 0) + 1 

245 try: 

246 n = groupindex['space'] - 1 

247 except KeyError: 

248 continue 

249 if m[n]: 

250 spaces += 1 

251 

252 quotechar = max(quotes, key=quotes.get) 

253 

254 if delims: 

255 delim = max(delims, key=delims.get) 

256 skipinitialspace = delims[delim] == spaces 

257 if delim == '\n': # most likely a file with a single column 

258 delim = '' 

259 else: 

260 # there is *no* delimiter, it's a single column of quoted data 

261 delim = '' 

262 skipinitialspace = 0 

263 

264 # if we see an extra quote between delimiters, we've got a 

265 # double quoted format 

266 dq_regexp = re.compile( 

267 r"((%(delim)s)|^)\W*%(quote)s[^%(delim)s\n]*%(quote)s[^%(delim)s\n]*%(quote)s\W*((%(delim)s)|$)" % \ 

268 {'delim':re.escape(delim), 'quote':quotechar}, re.MULTILINE) 

269 

270 

271 

272 if dq_regexp.search(data): 

273 doublequote = True 

274 else: 

275 doublequote = False 

276 

277 return (quotechar, doublequote, delim, skipinitialspace) 

278 

279 

280 def _guess_delimiter(self, data, delimiters): 

281 """ 

282 The delimiter /should/ occur the same number of times on 

283 each row. However, due to malformed data, it may not. We don't want 

284 an all or nothing approach, so we allow for small variations in this 

285 number. 

286 1) build a table of the frequency of each character on every line. 

287 2) build a table of frequencies of this frequency (meta-frequency?), 

288 e.g. 'x occurred 5 times in 10 rows, 6 times in 1000 rows, 

289 7 times in 2 rows' 

290 3) use the mode of the meta-frequency to determine the /expected/ 

291 frequency for that character 

292 4) find out how often the character actually meets that goal 

293 5) the character that best meets its goal is the delimiter 

294 For performance reasons, the data is evaluated in chunks, so it can 

295 try and evaluate the smallest portion of the data possible, evaluating 

296 additional chunks as necessary. 

297 """ 

298 

299 data = list(filter(None, data.split('\n'))) 

300 

301 ascii = [chr(c) for c in range(127)] # 7-bit ASCII 

302 

303 # build frequency tables 

304 chunkLength = min(10, len(data)) 

305 iteration = 0 

306 charFrequency = {} 

307 modes = {} 

308 delims = {} 

309 start, end = 0, chunkLength 

310 while start < len(data): 

311 iteration += 1 

312 for line in data[start:end]: 

313 for char in ascii: 

314 metaFrequency = charFrequency.get(char, {}) 

315 # must count even if frequency is 0 

316 freq = line.count(char) 

317 # value is the mode 

318 metaFrequency[freq] = metaFrequency.get(freq, 0) + 1 

319 charFrequency[char] = metaFrequency 

320 

321 for char in charFrequency.keys(): 

322 items = list(charFrequency[char].items()) 

323 if len(items) == 1 and items[0][0] == 0: 

324 continue 

325 # get the mode of the frequencies 

326 if len(items) > 1: 

327 modes[char] = max(items, key=lambda x: x[1]) 

328 # adjust the mode - subtract the sum of all 

329 # other frequencies 

330 items.remove(modes[char]) 

331 modes[char] = (modes[char][0], modes[char][1] 

332 - sum(item[1] for item in items)) 

333 else: 

334 modes[char] = items[0] 

335 

336 # build a list of possible delimiters 

337 modeList = modes.items() 

338 total = float(min(chunkLength * iteration, len(data))) 

339 # (rows of consistent data) / (number of rows) = 100% 

340 consistency = 1.0 

341 # minimum consistency threshold 

342 threshold = 0.9 

343 while len(delims) == 0 and consistency >= threshold: 

344 for k, v in modeList: 

345 if v[0] > 0 and v[1] > 0: 

346 if ((v[1]/total) >= consistency and 

347 (delimiters is None or k in delimiters)): 

348 delims[k] = v 

349 consistency -= 0.01 

350 

351 if len(delims) == 1: 

352 delim = list(delims.keys())[0] 

353 skipinitialspace = (data[0].count(delim) == 

354 data[0].count("%c " % delim)) 

355 return (delim, skipinitialspace) 

356 

357 # analyze another chunkLength lines 

358 start = end 

359 end += chunkLength 

360 

361 if not delims: 

362 return ('', 0) 

363 

364 # if there's more than one, fall back to a 'preferred' list 

365 if len(delims) > 1: 

366 for d in self.preferred: 

367 if d in delims.keys(): 

368 skipinitialspace = (data[0].count(d) == 

369 data[0].count("%c " % d)) 

370 return (d, skipinitialspace) 

371 

372 # nothing else indicates a preference, pick the character that 

373 # dominates(?) 

374 items = [(v,k) for (k,v) in delims.items()] 

375 items.sort() 

376 delim = items[-1][1] 

377 

378 skipinitialspace = (data[0].count(delim) == 

379 data[0].count("%c " % delim)) 

380 return (delim, skipinitialspace) 

381 

382 

383 def has_header(self, sample): 

384 # Creates a dictionary of types of data in each column. If any 

385 # column is of a single type (say, integers), *except* for the first 

386 # row, then the first row is presumed to be labels. If the type 

387 # can't be determined, it is assumed to be a string in which case 

388 # the length of the string is the determining factor: if all of the 

389 # rows except for the first are the same length, it's a header. 

390 # Finally, a 'vote' is taken at the end for each column, adding or 

391 # subtracting from the likelihood of the first row being a header. 

392 

393 rdr = reader(StringIO(sample), self.sniff(sample)) 

394 

395 header = next(rdr) # assume first row is header 

396 

397 columns = len(header) 

398 columnTypes = {} 

399 for i in range(columns): columnTypes[i] = None 

400 

401 checked = 0 

402 for row in rdr: 

403 # arbitrary number of rows to check, to keep it sane 

404 if checked > 20: 

405 break 

406 checked += 1 

407 

408 if len(row) != columns: 

409 continue # skip rows that have irregular number of columns 

410 

411 for col in list(columnTypes.keys()): 

412 

413 for thisType in [int, float, complex]: 

414 try: 

415 thisType(row[col]) 

416 break 

417 except (ValueError, OverflowError): 

418 pass 

419 else: 

420 # fallback to length of string 

421 thisType = len(row[col]) 

422 

423 if thisType != columnTypes[col]: 

424 if columnTypes[col] is None: # add new column type 

425 columnTypes[col] = thisType 

426 else: 

427 # type is inconsistent, remove column from 

428 # consideration 

429 del columnTypes[col] 

430 

431 # finally, compare results against first row and "vote" 

432 # on whether it's a header 

433 hasHeader = 0 

434 for col, colType in columnTypes.items(): 

435 if type(colType) == type(0): # it's a length 

436 if len(header[col]) != colType: 

437 hasHeader += 1 

438 else: 

439 hasHeader -= 1 

440 else: # attempt typecast 

441 try: 

442 colType(header[col]) 

443 except (ValueError, TypeError): 

444 hasHeader += 1 

445 else: 

446 hasHeader -= 1 

447 

448 return hasHeader > 0