Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fastavro/_read_py.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

453 statements  

1"""Python code for reading AVRO files""" 

2 

3# This code is a modified version of the code at 

4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under 

5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0) 

6 

7import bz2 

8import json 

9import lzma 

10import zlib 

11from datetime import datetime, timezone 

12from decimal import Context 

13from io import BytesIO 

14from struct import error as StructError 

15from typing import IO, Union, Optional, Generic, TypeVar, Iterator, Dict 

16from warnings import warn 

17 

18from .io.binary_decoder import BinaryDecoder 

19from .io.json_decoder import AvroJSONDecoder 

20from .logical_readers import LOGICAL_READERS 

21from .schema import ( 

22 extract_record_type, 

23 is_single_record_union, 

24 is_single_name_union, 

25 extract_logical_type, 

26 parse_schema, 

27) 

28from .types import Schema, AvroMessage, NamedSchemas 

29from ._read_common import ( 

30 SchemaResolutionError, 

31 MAGIC, 

32 SYNC_SIZE, 

33 HEADER_SCHEMA, 

34 missing_codec_lib, 

35) 

36from .const import NAMED_TYPES, AVRO_TYPES 

37 

38T = TypeVar("T") 

39 

40decimal_context = Context() 

41epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) 

42epoch_naive = datetime(1970, 1, 1) 

43 

44 

45def _default_named_schemas() -> Dict[str, NamedSchemas]: 

46 return {"writer": {}, "reader": {}} 

47 

48 

49def match_types(writer_type, reader_type, named_schemas): 

50 if isinstance(writer_type, list) or isinstance(reader_type, list): 

51 return True 

52 if isinstance(writer_type, dict) or isinstance(reader_type, dict): 

53 try: 

54 return match_schemas(writer_type, reader_type, named_schemas) 

55 except SchemaResolutionError: 

56 return False 

57 if writer_type == reader_type: 

58 return True 

59 # promotion cases 

60 elif writer_type == "int" and reader_type in ["long", "float", "double"]: 

61 return True 

62 elif writer_type == "long" and reader_type in ["float", "double"]: 

63 return True 

64 elif writer_type == "float" and reader_type == "double": 

65 return True 

66 elif writer_type == "string" and reader_type == "bytes": 

67 return True 

68 elif writer_type == "bytes" and reader_type == "string": 

69 return True 

70 writer_schema = named_schemas["writer"].get(writer_type) 

71 reader_schema = named_schemas["reader"].get(reader_type) 

72 if writer_schema is not None and reader_schema is not None: 

73 return match_types(writer_schema, reader_schema, named_schemas) 

74 return False 

75 

76 

77def match_schemas(w_schema, r_schema, named_schemas): 

78 error_msg = f"Schema mismatch: {w_schema} is not {r_schema}" 

79 if isinstance(w_schema, list): 

80 # If the writer is a union, checks will happen in read_union after the 

81 # correct schema is known 

82 return r_schema 

83 elif isinstance(r_schema, list): 

84 # If the reader is a union, ensure one of the new schemas is the same 

85 # as the writer 

86 for schema in r_schema: 

87 if match_types(w_schema, schema, named_schemas): 

88 return schema 

89 else: 

90 raise SchemaResolutionError(error_msg) 

91 else: 

92 # Check for dicts as primitive types are just strings 

93 if isinstance(w_schema, dict): 

94 w_type = w_schema["type"] 

95 else: 

96 w_type = w_schema 

97 if isinstance(r_schema, dict): 

98 r_type = r_schema["type"] 

99 else: 

100 r_type = r_schema 

101 

102 if w_type == r_type == "map": 

103 if match_types(w_schema["values"], r_schema["values"], named_schemas): 

104 return r_schema 

105 elif w_type == r_type == "array": 

106 if match_types(w_schema["items"], r_schema["items"], named_schemas): 

107 return r_schema 

108 elif w_type in NAMED_TYPES and r_type in NAMED_TYPES: 

109 if w_type == r_type == "fixed" and w_schema["size"] != r_schema["size"]: 

110 raise SchemaResolutionError( 

111 f"Schema mismatch: {w_schema} size is different than {r_schema} size" 

112 ) 

113 

114 w_unqual_name = w_schema["name"].split(".")[-1] 

115 r_unqual_name = r_schema["name"].split(".")[-1] 

116 r_aliases = r_schema.get("aliases", []) 

117 if ( 

118 w_unqual_name == r_unqual_name 

119 or w_schema["name"] in r_aliases 

120 or w_unqual_name in r_aliases 

121 ): 

122 return r_schema 

123 elif w_type not in AVRO_TYPES and r_type in NAMED_TYPES: 

124 if match_types(w_type, r_schema["name"], named_schemas): 

125 return r_schema["name"] 

126 elif match_types(w_type, r_type, named_schemas): 

127 return r_schema 

128 raise SchemaResolutionError(error_msg) 

129 

130 

131def read_null( 

132 decoder, 

133 writer_schema=None, 

134 named_schemas=None, 

135 reader_schema=None, 

136 options={}, 

137): 

138 return decoder.read_null() 

139 

140 

141def skip_null(decoder, writer_schema=None, named_schemas=None): 

142 decoder.read_null() 

143 

144 

145def read_boolean( 

146 decoder, 

147 writer_schema=None, 

148 named_schemas=None, 

149 reader_schema=None, 

150 options={}, 

151): 

152 return decoder.read_boolean() 

153 

154 

155def skip_boolean(decoder, writer_schema=None, named_schemas=None): 

156 decoder.read_boolean() 

157 

158 

159def read_int( 

160 decoder, 

161 writer_schema=None, 

162 named_schemas=None, 

163 reader_schema=None, 

164 options={}, 

165): 

166 return decoder.read_int() 

167 

168 

169def skip_int(decoder, writer_schema=None, named_schemas=None): 

170 decoder.read_int() 

171 

172 

173def read_long( 

174 decoder, 

175 writer_schema=None, 

176 named_schemas=None, 

177 reader_schema=None, 

178 options={}, 

179): 

180 return decoder.read_long() 

181 

182 

183def skip_long(decoder, writer_schema=None, named_schemas=None): 

184 decoder.read_long() 

185 

186 

187def read_float( 

188 decoder, 

189 writer_schema=None, 

190 named_schemas=None, 

191 reader_schema=None, 

192 options={}, 

193): 

194 return decoder.read_float() 

195 

196 

197def skip_float(decoder, writer_schema=None, named_schemas=None): 

198 decoder.read_float() 

199 

200 

201def read_double( 

202 decoder, 

203 writer_schema=None, 

204 named_schemas=None, 

205 reader_schema=None, 

206 options={}, 

207): 

208 return decoder.read_double() 

209 

210 

211def skip_double(decoder, writer_schema=None, named_schemas=None): 

212 decoder.read_double() 

213 

214 

215def read_bytes( 

216 decoder, 

217 writer_schema=None, 

218 named_schemas=None, 

219 reader_schema=None, 

220 options={}, 

221): 

222 return decoder.read_bytes() 

223 

224 

225def skip_bytes(decoder, writer_schema=None, named_schemas=None): 

226 decoder.read_bytes() 

227 

228 

229def read_utf8( 

230 decoder, 

231 writer_schema=None, 

232 named_schemas=None, 

233 reader_schema=None, 

234 options={}, 

235): 

236 return decoder.read_utf8( 

237 handle_unicode_errors=options.get("handle_unicode_errors", "strict") 

238 ) 

239 

240 

241def skip_utf8(decoder, writer_schema=None, named_schemas=None): 

242 decoder.read_utf8() 

243 

244 

245def read_fixed( 

246 decoder, 

247 writer_schema, 

248 named_schemas=None, 

249 reader_schema=None, 

250 options={}, 

251): 

252 size = writer_schema["size"] 

253 return decoder.read_fixed(size) 

254 

255 

256def skip_fixed(decoder, writer_schema, named_schemas=None): 

257 size = writer_schema["size"] 

258 decoder.read_fixed(size) 

259 

260 

261def read_enum( 

262 decoder, 

263 writer_schema, 

264 named_schemas, 

265 reader_schema=None, 

266 options={}, 

267): 

268 symbol = writer_schema["symbols"][decoder.read_enum()] 

269 if reader_schema and symbol not in reader_schema["symbols"]: 

270 default = reader_schema.get("default") 

271 if default: 

272 return default 

273 else: 

274 symlist = reader_schema["symbols"] 

275 msg = f"{symbol} not found in reader symbol list {reader_schema['name']}, known symbols: {symlist}" 

276 raise SchemaResolutionError(msg) 

277 return symbol 

278 

279 

280def skip_enum(decoder, writer_schema, named_schemas): 

281 decoder.read_enum() 

282 

283 

284def read_array( 

285 decoder, 

286 writer_schema, 

287 named_schemas, 

288 reader_schema=None, 

289 options={}, 

290): 

291 if reader_schema: 

292 

293 def item_reader(decoder, w_schema, r_schema, options): 

294 return read_data( 

295 decoder, 

296 w_schema["items"], 

297 named_schemas, 

298 r_schema["items"], 

299 options, 

300 ) 

301 

302 else: 

303 

304 def item_reader(decoder, w_schema, r_schema, options): 

305 return read_data( 

306 decoder, 

307 w_schema["items"], 

308 named_schemas, 

309 None, 

310 options, 

311 ) 

312 

313 read_items = [] 

314 

315 decoder.read_array_start() 

316 

317 for item in decoder.iter_array(): 

318 read_items.append( 

319 item_reader( 

320 decoder, 

321 writer_schema, 

322 reader_schema, 

323 options, 

324 ) 

325 ) 

326 

327 decoder.read_array_end() 

328 

329 return read_items 

330 

331 

332def skip_array(decoder, writer_schema, named_schemas): 

333 decoder.read_array_start() 

334 

335 for item in decoder.iter_array(): 

336 skip_data(decoder, writer_schema["items"], named_schemas) 

337 

338 decoder.read_array_end() 

339 

340 

341def read_map( 

342 decoder, 

343 writer_schema, 

344 named_schemas, 

345 reader_schema=None, 

346 options={}, 

347): 

348 if reader_schema: 

349 

350 def item_reader(decoder, w_schema, r_schema): 

351 return read_data( 

352 decoder, 

353 w_schema["values"], 

354 named_schemas, 

355 r_schema["values"], 

356 options, 

357 ) 

358 

359 else: 

360 

361 def item_reader(decoder, w_schema, r_schema): 

362 return read_data( 

363 decoder, 

364 w_schema["values"], 

365 named_schemas, 

366 None, 

367 options, 

368 ) 

369 

370 read_items = {} 

371 

372 decoder.read_map_start() 

373 

374 for item in decoder.iter_map(): 

375 key = decoder.read_utf8() 

376 read_items[key] = item_reader(decoder, writer_schema, reader_schema) 

377 

378 decoder.read_map_end() 

379 

380 return read_items 

381 

382 

383def skip_map(decoder, writer_schema, named_schemas): 

384 decoder.read_map_start() 

385 

386 for item in decoder.iter_map(): 

387 decoder.read_utf8() 

388 skip_data(decoder, writer_schema["values"], named_schemas) 

389 

390 decoder.read_map_end() 

391 

392 

393def read_union( 

394 decoder, 

395 writer_schema, 

396 named_schemas, 

397 reader_schema=None, 

398 options={}, 

399): 

400 # schema resolution 

401 index = decoder.read_index() 

402 idx_schema = writer_schema[index] 

403 idx_reader_schema = None 

404 

405 if reader_schema: 

406 msg = f"schema mismatch: {writer_schema} not found in {reader_schema}" 

407 # Handle case where the reader schema is just a single type (not union) 

408 if not isinstance(reader_schema, list): 

409 if match_types(idx_schema, reader_schema, named_schemas): 

410 result = read_data( 

411 decoder, 

412 idx_schema, 

413 named_schemas, 

414 reader_schema, 

415 options, 

416 ) 

417 else: 

418 raise SchemaResolutionError(msg) 

419 else: 

420 for schema in reader_schema: 

421 if match_types(idx_schema, schema, named_schemas): 

422 idx_reader_schema = schema 

423 result = read_data( 

424 decoder, 

425 idx_schema, 

426 named_schemas, 

427 schema, 

428 options, 

429 ) 

430 break 

431 else: 

432 raise SchemaResolutionError(msg) 

433 else: 

434 result = read_data(decoder, idx_schema, named_schemas, None, options) 

435 

436 return_record_name_override = options.get("return_record_name_override") 

437 return_record_name = options.get("return_record_name") 

438 return_named_type_override = options.get("return_named_type_override") 

439 return_named_type = options.get("return_named_type") 

440 if return_named_type_override and is_single_name_union(writer_schema): 

441 return result 

442 elif return_named_type and extract_record_type(idx_schema) in NAMED_TYPES: 

443 schema_name = ( 

444 idx_reader_schema["name"] if idx_reader_schema else idx_schema["name"] 

445 ) 

446 return (schema_name, result) 

447 elif return_named_type and extract_record_type(idx_schema) not in AVRO_TYPES: 

448 # idx_schema is a named type 

449 schema_name = ( 

450 named_schemas["reader"][idx_reader_schema]["name"] 

451 if idx_reader_schema 

452 else named_schemas["writer"][idx_schema]["name"] 

453 ) 

454 return (schema_name, result) 

455 elif return_record_name_override and is_single_record_union(writer_schema): 

456 return result 

457 elif return_record_name and extract_record_type(idx_schema) == "record": 

458 schema_name = ( 

459 idx_reader_schema["name"] if idx_reader_schema else idx_schema["name"] 

460 ) 

461 return (schema_name, result) 

462 elif return_record_name and extract_record_type(idx_schema) not in AVRO_TYPES: 

463 # idx_schema is a named type 

464 schema_name = ( 

465 named_schemas["reader"][idx_reader_schema]["name"] 

466 if idx_reader_schema 

467 else named_schemas["writer"][idx_schema]["name"] 

468 ) 

469 return (schema_name, result) 

470 else: 

471 return result 

472 

473 

474def skip_union(decoder, writer_schema, named_schemas): 

475 # schema resolution 

476 index = decoder.read_index() 

477 skip_data(decoder, writer_schema[index], named_schemas) 

478 

479 

480def read_record( 

481 decoder, 

482 writer_schema, 

483 named_schemas, 

484 reader_schema=None, 

485 options={}, 

486): 

487 """A record is encoded by encoding the values of its fields in the order 

488 that they are declared. In other words, a record is encoded as just the 

489 concatenation of the encodings of its fields. Field values are encoded per 

490 their schema. 

491 

492 Schema Resolution: 

493 * the ordering of fields may be different: fields are matched by name. 

494 * schemas for fields with the same name in both records are resolved 

495 recursively. 

496 * if the writer's record contains a field with a name not present in the 

497 reader's record, the writer's value for that field is ignored. 

498 * if the reader's record schema has a field that contains a default value, 

499 and writer's schema does not have a field with the same name, then the 

500 reader should use the default value from its field. 

501 * if the reader's record schema has a field with no default value, and 

502 writer's schema does not have a field with the same name, then the 

503 field's value is unset. 

504 """ 

505 record = {} 

506 if reader_schema is None: 

507 for field in writer_schema["fields"]: 

508 record[field["name"]] = read_data( 

509 decoder, 

510 field["type"], 

511 named_schemas, 

512 None, 

513 options, 

514 ) 

515 else: 

516 readers_field_dict = {} 

517 aliases_field_dict = {} 

518 for f in reader_schema["fields"]: 

519 readers_field_dict[f["name"]] = f 

520 for alias in f.get("aliases", []): 

521 aliases_field_dict[alias] = f 

522 

523 for field in writer_schema["fields"]: 

524 readers_field = readers_field_dict.get( 

525 field["name"], 

526 aliases_field_dict.get(field["name"]), 

527 ) 

528 if readers_field: 

529 record[readers_field["name"]] = read_data( 

530 decoder, 

531 field["type"], 

532 named_schemas, 

533 readers_field["type"], 

534 options, 

535 ) 

536 else: 

537 skip_data(decoder, field["type"], named_schemas) 

538 

539 # fill in default values 

540 if len(readers_field_dict) > len(record): 

541 writer_fields = [f["name"] for f in writer_schema["fields"]] 

542 for f_name, field in readers_field_dict.items(): 

543 if f_name not in writer_fields and f_name not in record: 

544 if "default" in field: 

545 record[field["name"]] = field["default"] 

546 else: 

547 msg = f"No default value for field {field['name']} in {reader_schema['name']}" 

548 raise SchemaResolutionError(msg) 

549 

550 return record 

551 

552 

553def skip_record(decoder, writer_schema, named_schemas): 

554 for field in writer_schema["fields"]: 

555 skip_data(decoder, field["type"], named_schemas) 

556 

557 

558READERS = { 

559 "null": read_null, 

560 "boolean": read_boolean, 

561 "string": read_utf8, 

562 "int": read_int, 

563 "long": read_long, 

564 "float": read_float, 

565 "double": read_double, 

566 "bytes": read_bytes, 

567 "fixed": read_fixed, 

568 "enum": read_enum, 

569 "array": read_array, 

570 "map": read_map, 

571 "union": read_union, 

572 "error_union": read_union, 

573 "record": read_record, 

574 "error": read_record, 

575 "request": read_record, 

576} 

577 

578SKIPS = { 

579 "null": skip_null, 

580 "boolean": skip_boolean, 

581 "string": skip_utf8, 

582 "int": skip_int, 

583 "long": skip_long, 

584 "float": skip_float, 

585 "double": skip_double, 

586 "bytes": skip_bytes, 

587 "fixed": skip_fixed, 

588 "enum": skip_enum, 

589 "array": skip_array, 

590 "map": skip_map, 

591 "union": skip_union, 

592 "error_union": skip_union, 

593 "record": skip_record, 

594 "error": skip_record, 

595 "request": skip_record, 

596} 

597 

598 

599def maybe_promote(data, writer_type, reader_type): 

600 if writer_type == "int": 

601 # No need to promote to long since they are the same type in Python 

602 if reader_type == "float" or reader_type == "double": 

603 return float(data) 

604 if writer_type == "long": 

605 if reader_type == "float" or reader_type == "double": 

606 return float(data) 

607 if writer_type == "string" and reader_type == "bytes": 

608 return data.encode() 

609 if writer_type == "bytes" and reader_type == "string": 

610 return data.decode() 

611 return data 

612 

613 

614def read_data( 

615 decoder, 

616 writer_schema, 

617 named_schemas, 

618 reader_schema=None, 

619 options={}, 

620): 

621 """Read data from file object according to schema.""" 

622 

623 record_type = extract_record_type(writer_schema) 

624 

625 if reader_schema: 

626 reader_schema = match_schemas( 

627 writer_schema, 

628 reader_schema, 

629 named_schemas, 

630 ) 

631 

632 reader_fn = READERS.get(record_type) 

633 if reader_fn: 

634 try: 

635 data = reader_fn( 

636 decoder, 

637 writer_schema, 

638 named_schemas, 

639 reader_schema, 

640 options, 

641 ) 

642 except StructError: 

643 raise EOFError(f"cannot read {record_type} from {decoder.fo}") 

644 

645 if "logicalType" in writer_schema: 

646 logical_type = extract_logical_type(writer_schema) 

647 fn = LOGICAL_READERS.get(logical_type) 

648 if fn: 

649 return fn(data, writer_schema, reader_schema) 

650 

651 if reader_schema is not None: 

652 return maybe_promote(data, record_type, extract_record_type(reader_schema)) 

653 else: 

654 return data 

655 else: 

656 return read_data( 

657 decoder, 

658 named_schemas["writer"][record_type], 

659 named_schemas, 

660 named_schemas["reader"].get(reader_schema), 

661 options, 

662 ) 

663 

664 

665def skip_data(decoder, writer_schema, named_schemas): 

666 record_type = extract_record_type(writer_schema) 

667 

668 reader_fn = SKIPS.get(record_type) 

669 if reader_fn: 

670 reader_fn(decoder, writer_schema, named_schemas) 

671 else: 

672 skip_data(decoder, named_schemas["writer"][record_type], named_schemas) 

673 

674 

675def skip_sync(fo, sync_marker): 

676 """Skip an expected sync marker, complaining if it doesn't match""" 

677 if fo.read(SYNC_SIZE) != sync_marker: 

678 raise ValueError("expected sync marker not found") 

679 

680 

681def null_read_block(decoder): 

682 """Read block in "null" codec.""" 

683 return BytesIO(decoder.read_bytes()) 

684 

685 

686def deflate_read_block(decoder): 

687 """Read block in "deflate" codec.""" 

688 data = decoder.read_bytes() 

689 # -15 is the log of the window size; negative indicates "raw" (no 

690 # zlib headers) decompression. See zlib.h. 

691 return BytesIO(zlib.decompressobj(-15).decompress(data)) 

692 

693 

694def bzip2_read_block(decoder): 

695 """Read block in "bzip2" codec.""" 

696 data = decoder.read_bytes() 

697 return BytesIO(bz2.decompress(data)) 

698 

699 

700def xz_read_block(decoder): 

701 length = read_long(decoder) 

702 data = decoder.read_fixed(length) 

703 return BytesIO(lzma.decompress(data)) 

704 

705 

706BLOCK_READERS = { 

707 "null": null_read_block, 

708 "deflate": deflate_read_block, 

709 "bzip2": bzip2_read_block, 

710 "xz": xz_read_block, 

711} 

712 

713 

714def snappy_read_block(decoder): 

715 length = read_long(decoder) 

716 data = decoder.read_fixed(length - 4) 

717 decoder.read_fixed(4) # CRC 

718 return BytesIO(snappy_decompress(data)) 

719 

720 

721try: 

722 from cramjam import snappy 

723 

724 snappy_decompress = snappy.decompress_raw 

725except ImportError: 

726 try: 

727 import snappy 

728 

729 snappy_decompress = snappy.decompress 

730 warn( 

731 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed", 

732 DeprecationWarning, 

733 ) 

734 except ImportError: 

735 BLOCK_READERS["snappy"] = missing_codec_lib("snappy", "cramjam") 

736 else: 

737 BLOCK_READERS["snappy"] = snappy_read_block 

738else: 

739 BLOCK_READERS["snappy"] = snappy_read_block 

740 

741 

742def zstandard_read_block(decoder): 

743 length = read_long(decoder) 

744 data = decoder.read_fixed(length) 

745 return BytesIO(zstandard.ZstdDecompressor().decompressobj().decompress(data)) 

746 

747 

748try: 

749 import zstandard 

750except ImportError: 

751 BLOCK_READERS["zstandard"] = missing_codec_lib("zstandard", "zstandard") 

752else: 

753 BLOCK_READERS["zstandard"] = zstandard_read_block 

754 

755 

756def lz4_read_block(decoder): 

757 length = read_long(decoder) 

758 data = decoder.read_fixed(length) 

759 return BytesIO(lz4.block.decompress(data)) 

760 

761 

762try: 

763 import lz4.block 

764except ImportError: 

765 BLOCK_READERS["lz4"] = missing_codec_lib("lz4", "lz4") 

766else: 

767 BLOCK_READERS["lz4"] = lz4_read_block 

768 

769 

770def _iter_avro_records( 

771 decoder, 

772 header, 

773 codec, 

774 writer_schema, 

775 named_schemas, 

776 reader_schema, 

777 options, 

778): 

779 """Return iterator over avro records.""" 

780 sync_marker = header["sync"] 

781 

782 read_block = BLOCK_READERS.get(codec) 

783 if not read_block: 

784 raise ValueError(f"Unrecognized codec: {codec}") 

785 

786 block_count = 0 

787 while True: 

788 try: 

789 block_count = decoder.read_long() 

790 except EOFError: 

791 return 

792 

793 block_fo = read_block(decoder) 

794 

795 for i in range(block_count): 

796 yield read_data( 

797 BinaryDecoder(block_fo), 

798 writer_schema, 

799 named_schemas, 

800 reader_schema, 

801 options, 

802 ) 

803 

804 skip_sync(decoder.fo, sync_marker) 

805 

806 

807def _iter_avro_blocks( 

808 decoder, 

809 header, 

810 codec, 

811 writer_schema, 

812 named_schemas, 

813 reader_schema, 

814 options, 

815): 

816 """Return iterator over avro blocks.""" 

817 sync_marker = header["sync"] 

818 

819 read_block = BLOCK_READERS.get(codec) 

820 if not read_block: 

821 raise ValueError(f"Unrecognized codec: {codec}") 

822 

823 while True: 

824 offset = decoder.fo.tell() 

825 try: 

826 num_block_records = decoder.read_long() 

827 except EOFError: 

828 return 

829 

830 block_bytes = read_block(decoder) 

831 

832 skip_sync(decoder.fo, sync_marker) 

833 

834 size = decoder.fo.tell() - offset 

835 

836 yield Block( 

837 block_bytes, 

838 num_block_records, 

839 codec, 

840 reader_schema, 

841 writer_schema, 

842 named_schemas, 

843 offset, 

844 size, 

845 options, 

846 ) 

847 

848 

849class Block: 

850 """An avro block. Will yield records when iterated over 

851 

852 .. attribute:: num_records 

853 

854 Number of records in the block 

855 

856 .. attribute:: writer_schema 

857 

858 The schema used when writing 

859 

860 .. attribute:: reader_schema 

861 

862 The schema used when reading (if provided) 

863 

864 .. attribute:: offset 

865 

866 Offset of the block from the beginning of the avro file 

867 

868 .. attribute:: size 

869 

870 Size of the block in bytes 

871 """ 

872 

873 def __init__( 

874 self, 

875 bytes_, 

876 num_records, 

877 codec, 

878 reader_schema, 

879 writer_schema, 

880 named_schemas, 

881 offset, 

882 size, 

883 options, 

884 ): 

885 self.bytes_ = bytes_ 

886 self.num_records = num_records 

887 self.codec = codec 

888 self.reader_schema = reader_schema 

889 self.writer_schema = writer_schema 

890 self._named_schemas = named_schemas 

891 self.offset = offset 

892 self.size = size 

893 self.options = options 

894 

895 def __iter__(self): 

896 for i in range(self.num_records): 

897 yield read_data( 

898 BinaryDecoder(self.bytes_), 

899 self.writer_schema, 

900 self._named_schemas, 

901 self.reader_schema, 

902 self.options, 

903 ) 

904 

905 def __str__(self): 

906 return ( 

907 f"Avro block: {len(self.bytes_)} bytes, " 

908 + f"{self.num_records} records, " 

909 + f"codec: {self.codec}, position {self.offset}+{self.size}" 

910 ) 

911 

912 

913class file_reader(Generic[T]): 

914 def __init__( 

915 self, 

916 fo_or_decoder, 

917 reader_schema=None, 

918 options={}, 

919 ): 

920 if isinstance(fo_or_decoder, AvroJSONDecoder): 

921 self.decoder = fo_or_decoder 

922 else: 

923 # If a decoder was not provided, assume binary 

924 self.decoder = BinaryDecoder(fo_or_decoder) 

925 

926 self._named_schemas = _default_named_schemas() 

927 if reader_schema: 

928 self.reader_schema = parse_schema( 

929 reader_schema, self._named_schemas["reader"], _write_hint=False 

930 ) 

931 

932 else: 

933 self.reader_schema = None 

934 self.options = options 

935 self._elems = None 

936 

937 def _read_header(self): 

938 try: 

939 self._header = read_data( 

940 self.decoder, 

941 HEADER_SCHEMA, 

942 self._named_schemas, 

943 None, 

944 self.options, 

945 ) 

946 except EOFError: 

947 raise ValueError("cannot read header - is it an avro file?") 

948 

949 # `meta` values are bytes. So, the actual decoding has to be external. 

950 self.metadata = {k: v.decode() for k, v in self._header["meta"].items()} 

951 

952 self._schema = json.loads(self.metadata["avro.schema"]) 

953 self.codec = self.metadata.get("avro.codec", "null") 

954 

955 # Older avro files created before we were more strict about 

956 # defaults might have been writen with a bad default. Since we re-parse 

957 # the writer schema here, it will now fail. Therefore, if a user 

958 # provides a reader schema that passes parsing, we will ignore those 

959 # default errors 

960 if self.reader_schema is not None: 

961 ignore_default_error = True 

962 else: 

963 ignore_default_error = False 

964 

965 # Always parse the writer schema since it might have named types that 

966 # need to be stored in self._named_types 

967 self.writer_schema = parse_schema( 

968 self._schema, 

969 self._named_schemas["writer"], 

970 _write_hint=False, 

971 _force=True, 

972 _ignore_default_error=ignore_default_error, 

973 ) 

974 

975 @property 

976 def schema(self): 

977 import warnings 

978 

979 warnings.warn( 

980 "The 'schema' attribute is deprecated. Please use 'writer_schema'", 

981 DeprecationWarning, 

982 ) 

983 return self._schema 

984 

985 def __iter__(self) -> Iterator[T]: 

986 if not self._elems: 

987 raise NotImplementedError 

988 return self._elems 

989 

990 def __next__(self) -> T: 

991 return next(self._elems) 

992 

993 

994class reader(file_reader[AvroMessage]): 

995 """Iterator over records in an avro file. 

996 

997 Parameters 

998 ---------- 

999 fo 

1000 File-like object to read from 

1001 reader_schema 

1002 Reader schema 

1003 return_record_name 

1004 If true, when reading a union of records, the result will be a tuple 

1005 where the first value is the name of the record and the second value is 

1006 the record itself 

1007 return_record_name_override 

1008 If true, this will modify the behavior of return_record_name so that 

1009 the record name is only returned for unions where there is more than 

1010 one record. For unions that only have one record, this option will make 

1011 it so that the record is returned by itself, not a tuple with the name. 

1012 return_named_type 

1013 If true, when reading a union of named types, the result will be a tuple 

1014 where the first value is the name of the type and the second value is 

1015 the record itself 

1016 NOTE: Using this option will ignore return_record_name and 

1017 return_record_name_override 

1018 return_named_type_override 

1019 If true, this will modify the behavior of return_named_type so that 

1020 the named type is only returned for unions where there is more than 

1021 one named type. For unions that only have one named type, this option 

1022 will make it so that the named type is returned by itself, not a tuple 

1023 with the name 

1024 handle_unicode_errors 

1025 Default `strict`. Should be set to a valid string that can be used in 

1026 the errors argument of the string decode() function. Examples include 

1027 `replace` and `ignore` 

1028 

1029 

1030 Example:: 

1031 

1032 from fastavro import reader 

1033 with open('some-file.avro', 'rb') as fo: 

1034 avro_reader = reader(fo) 

1035 for record in avro_reader: 

1036 process_record(record) 

1037 

1038 The `fo` argument is a file-like object so another common example usage 

1039 would use an `io.BytesIO` object like so:: 

1040 

1041 from io import BytesIO 

1042 from fastavro import writer, reader 

1043 

1044 fo = BytesIO() 

1045 writer(fo, schema, records) 

1046 fo.seek(0) 

1047 for record in reader(fo): 

1048 process_record(record) 

1049 

1050 .. attribute:: metadata 

1051 

1052 Key-value pairs in the header metadata 

1053 

1054 .. attribute:: codec 

1055 

1056 The codec used when writing 

1057 

1058 .. attribute:: writer_schema 

1059 

1060 The schema used when writing 

1061 

1062 .. attribute:: reader_schema 

1063 

1064 The schema used when reading (if provided) 

1065 """ 

1066 

1067 def __init__( 

1068 self, 

1069 fo: Union[IO, AvroJSONDecoder], 

1070 reader_schema: Optional[Schema] = None, 

1071 return_record_name: bool = False, 

1072 return_record_name_override: bool = False, 

1073 handle_unicode_errors: str = "strict", 

1074 return_named_type: bool = False, 

1075 return_named_type_override: bool = False, 

1076 ): 

1077 options = { 

1078 "return_record_name": return_record_name, 

1079 "return_record_name_override": return_record_name_override, 

1080 "handle_unicode_errors": handle_unicode_errors, 

1081 "return_named_type": return_named_type, 

1082 "return_named_type_override": return_named_type_override, 

1083 } 

1084 super().__init__(fo, reader_schema, options) 

1085 

1086 if isinstance(self.decoder, AvroJSONDecoder): 

1087 self.decoder.configure(self.reader_schema, self._named_schemas["reader"]) 

1088 

1089 self.writer_schema = self.reader_schema 

1090 self.reader_schema = None 

1091 self._named_schemas["writer"] = self._named_schemas["reader"] 

1092 self._named_schemas["reader"] = {} 

1093 

1094 def _elems(): 

1095 while not self.decoder.done: 

1096 yield read_data( 

1097 self.decoder, 

1098 self.writer_schema, 

1099 self._named_schemas, 

1100 self.reader_schema, 

1101 self.options, 

1102 ) 

1103 self.decoder.drain() 

1104 

1105 self._elems = _elems() 

1106 

1107 else: 

1108 self._read_header() 

1109 

1110 self._elems = _iter_avro_records( 

1111 self.decoder, 

1112 self._header, 

1113 self.codec, 

1114 self.writer_schema, 

1115 self._named_schemas, 

1116 self.reader_schema, 

1117 self.options, 

1118 ) 

1119 

1120 

1121class block_reader(file_reader[Block]): 

1122 """Iterator over :class:`.Block` in an avro file. 

1123 

1124 Parameters 

1125 ---------- 

1126 fo 

1127 Input stream 

1128 reader_schema 

1129 Reader schema 

1130 return_record_name 

1131 If true, when reading a union of records, the result will be a tuple 

1132 where the first value is the name of the record and the second value is 

1133 the record itself 

1134 return_record_name_override 

1135 If true, this will modify the behavior of return_record_name so that 

1136 the record name is only returned for unions where there is more than 

1137 one record. For unions that only have one record, this option will make 

1138 it so that the record is returned by itself, not a tuple with the name. 

1139 return_named_type 

1140 If true, when reading a union of named types, the result will be a tuple 

1141 where the first value is the name of the type and the second value is 

1142 the record itself 

1143 NOTE: Using this option will ignore return_record_name and 

1144 return_record_name_override 

1145 return_named_type_override 

1146 If true, this will modify the behavior of return_named_type so that 

1147 the named type is only returned for unions where there is more than 

1148 one named type. For unions that only have one named type, this option 

1149 will make it so that the named type is returned by itself, not a tuple 

1150 with the name 

1151 handle_unicode_errors 

1152 Default `strict`. Should be set to a valid string that can be used in 

1153 the errors argument of the string decode() function. Examples include 

1154 `replace` and `ignore` 

1155 

1156 

1157 Example:: 

1158 

1159 from fastavro import block_reader 

1160 with open('some-file.avro', 'rb') as fo: 

1161 avro_reader = block_reader(fo) 

1162 for block in avro_reader: 

1163 process_block(block) 

1164 

1165 .. attribute:: metadata 

1166 

1167 Key-value pairs in the header metadata 

1168 

1169 .. attribute:: codec 

1170 

1171 The codec used when writing 

1172 

1173 .. attribute:: writer_schema 

1174 

1175 The schema used when writing 

1176 

1177 .. attribute:: reader_schema 

1178 

1179 The schema used when reading (if provided) 

1180 """ 

1181 

1182 def __init__( 

1183 self, 

1184 fo: IO, 

1185 reader_schema: Optional[Schema] = None, 

1186 return_record_name: bool = False, 

1187 return_record_name_override: bool = False, 

1188 handle_unicode_errors: str = "strict", 

1189 return_named_type: bool = False, 

1190 return_named_type_override: bool = False, 

1191 ): 

1192 options = { 

1193 "return_record_name": return_record_name, 

1194 "return_record_name_override": return_record_name_override, 

1195 "handle_unicode_errors": handle_unicode_errors, 

1196 "return_named_type": return_named_type, 

1197 "return_named_type_override": return_named_type_override, 

1198 } 

1199 super().__init__(fo, reader_schema, options) 

1200 

1201 self._read_header() 

1202 

1203 self._elems = _iter_avro_blocks( 

1204 self.decoder, 

1205 self._header, 

1206 self.codec, 

1207 self.writer_schema, 

1208 self._named_schemas, 

1209 self.reader_schema, 

1210 self.options, 

1211 ) 

1212 

1213 

1214def schemaless_reader( 

1215 fo: IO, 

1216 writer_schema: Schema, 

1217 reader_schema: Optional[Schema] = None, 

1218 return_record_name: bool = False, 

1219 return_record_name_override: bool = False, 

1220 handle_unicode_errors: str = "strict", 

1221 return_named_type: bool = False, 

1222 return_named_type_override: bool = False, 

1223) -> AvroMessage: 

1224 """Reads a single record written using the 

1225 :meth:`~fastavro._write_py.schemaless_writer` 

1226 

1227 Parameters 

1228 ---------- 

1229 fo 

1230 Input stream 

1231 writer_schema 

1232 Schema used when calling schemaless_writer 

1233 reader_schema 

1234 If the schema has changed since being written then the new schema can 

1235 be given to allow for schema migration 

1236 return_record_name 

1237 If true, when reading a union of records, the result will be a tuple 

1238 where the first value is the name of the record and the second value is 

1239 the record itself 

1240 return_record_name_override 

1241 If true, this will modify the behavior of return_record_name so that 

1242 the record name is only returned for unions where there is more than 

1243 one record. For unions that only have one record, this option will make 

1244 it so that the record is returned by itself, not a tuple with the name. 

1245 return_named_type 

1246 If true, when reading a union of named types, the result will be a tuple 

1247 where the first value is the name of the type and the second value is 

1248 the record itself 

1249 NOTE: Using this option will ignore return_record_name and 

1250 return_record_name_override 

1251 return_named_type_override 

1252 If true, this will modify the behavior of return_named_type so that 

1253 the named type is only returned for unions where there is more than 

1254 one named type. For unions that only have one named type, this option 

1255 will make it so that the named type is returned by itself, not a tuple 

1256 with the name 

1257 handle_unicode_errors 

1258 Default `strict`. Should be set to a valid string that can be used in 

1259 the errors argument of the string decode() function. Examples include 

1260 `replace` and `ignore` 

1261 

1262 

1263 Example:: 

1264 

1265 parsed_schema = fastavro.parse_schema(schema) 

1266 with open('file', 'rb') as fp: 

1267 record = fastavro.schemaless_reader(fp, parsed_schema) 

1268 

1269 Note: The ``schemaless_reader`` can only read a single record. 

1270 """ 

1271 if writer_schema == reader_schema: 

1272 # No need for the reader schema if they are the same 

1273 reader_schema = None 

1274 

1275 named_schemas: Dict[str, NamedSchemas] = _default_named_schemas() 

1276 writer_schema = parse_schema(writer_schema, named_schemas["writer"]) 

1277 

1278 if reader_schema: 

1279 reader_schema = parse_schema(reader_schema, named_schemas["reader"]) 

1280 

1281 decoder = BinaryDecoder(fo) 

1282 

1283 options = { 

1284 "return_record_name": return_record_name, 

1285 "return_record_name_override": return_record_name_override, 

1286 "handle_unicode_errors": handle_unicode_errors, 

1287 "return_named_type": return_named_type, 

1288 "return_named_type_override": return_named_type_override, 

1289 } 

1290 

1291 return read_data( 

1292 decoder, 

1293 writer_schema, 

1294 named_schemas, 

1295 reader_schema, 

1296 options, 

1297 ) 

1298 

1299 

1300def is_avro(path_or_buffer: Union[str, IO]) -> bool: 

1301 """Return True if path (or buffer) points to an Avro file. This will only 

1302 work for avro files that contain the normal avro schema header like those 

1303 create from :func:`~fastavro._write_py.writer`. This function is not intended 

1304 to be used with binary data created from 

1305 :func:`~fastavro._write_py.schemaless_writer` since that does not include the 

1306 avro header. 

1307 

1308 Parameters 

1309 ---------- 

1310 path_or_buffer 

1311 Path to file 

1312 """ 

1313 fp: IO 

1314 if isinstance(path_or_buffer, str): 

1315 fp = open(path_or_buffer, "rb") 

1316 close = True 

1317 else: 

1318 fp = path_or_buffer 

1319 close = False 

1320 

1321 try: 

1322 header = fp.read(len(MAGIC)) 

1323 return header == MAGIC 

1324 finally: 

1325 if close: 

1326 fp.close()