Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastavro/_read_py.py: 23%

416 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:10 +0000

1# cython: auto_cpdef=True 

2 

3"""Python code for reading AVRO files""" 

4 

5# This code is a modified version of the code at 

6# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under 

7# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0) 

8 

9import bz2 

10import json 

11import lzma 

12import zlib 

13from datetime import datetime, timezone 

14from decimal import Context 

15from io import BytesIO 

16from struct import error as StructError 

17from typing import IO, Union, Optional, Generic, TypeVar, Iterator, Dict 

18 

19from .io.binary_decoder import BinaryDecoder 

20from .io.json_decoder import AvroJSONDecoder 

21from .logical_readers import LOGICAL_READERS 

22from .schema import ( 

23 extract_record_type, 

24 is_nullable_union, 

25 extract_logical_type, 

26 parse_schema, 

27) 

28from .types import Schema, AvroMessage, NamedSchemas 

29from ._read_common import ( 

30 SchemaResolutionError, 

31 MAGIC, 

32 SYNC_SIZE, 

33 HEADER_SCHEMA, 

34 missing_codec_lib, 

35) 

36from .const import NAMED_TYPES, AVRO_TYPES 

37 

38T = TypeVar("T") 

39 

40decimal_context = Context() 

41epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) 

42epoch_naive = datetime(1970, 1, 1) 

43 

44 

45def match_types(writer_type, reader_type): 

46 if isinstance(writer_type, list) or isinstance(reader_type, list): 

47 return True 

48 if isinstance(writer_type, dict) or isinstance(reader_type, dict): 

49 try: 

50 return match_schemas(writer_type, reader_type) 

51 except SchemaResolutionError: 

52 return False 

53 if writer_type == reader_type: 

54 return True 

55 # promotion cases 

56 elif writer_type == "int" and reader_type in ["long", "float", "double"]: 

57 return True 

58 elif writer_type == "long" and reader_type in ["float", "double"]: 

59 return True 

60 elif writer_type == "float" and reader_type == "double": 

61 return True 

62 elif writer_type == "string" and reader_type == "bytes": 

63 return True 

64 elif writer_type == "bytes" and reader_type == "string": 

65 return True 

66 return False 

67 

68 

69def match_schemas(w_schema, r_schema): 

70 error_msg = f"Schema mismatch: {w_schema} is not {r_schema}" 

71 if isinstance(w_schema, list): 

72 # If the writer is a union, checks will happen in read_union after the 

73 # correct schema is known 

74 return r_schema 

75 elif isinstance(r_schema, list): 

76 # If the reader is a union, ensure one of the new schemas is the same 

77 # as the writer 

78 for schema in r_schema: 

79 if match_types(w_schema, schema): 

80 return schema 

81 else: 

82 raise SchemaResolutionError(error_msg) 

83 else: 

84 # Check for dicts as primitive types are just strings 

85 if isinstance(w_schema, dict): 

86 w_type = w_schema["type"] 

87 else: 

88 w_type = w_schema 

89 if isinstance(r_schema, dict): 

90 r_type = r_schema["type"] 

91 else: 

92 r_type = r_schema 

93 

94 if w_type == r_type == "map": 

95 if match_types(w_schema["values"], r_schema["values"]): 

96 return r_schema 

97 elif w_type == r_type == "array": 

98 if match_types(w_schema["items"], r_schema["items"]): 

99 return r_schema 

100 elif w_type in NAMED_TYPES and r_type in NAMED_TYPES: 

101 if w_type == r_type == "fixed" and w_schema["size"] != r_schema["size"]: 

102 raise SchemaResolutionError( 

103 f"Schema mismatch: {w_schema} size is different than {r_schema} size" 

104 ) 

105 

106 w_unqual_name = w_schema["name"].split(".")[-1] 

107 r_unqual_name = r_schema["name"].split(".")[-1] 

108 if w_unqual_name == r_unqual_name or w_schema["name"] in r_schema.get( 

109 "aliases", [] 

110 ): 

111 return r_schema 

112 elif w_type not in AVRO_TYPES and r_type in NAMED_TYPES: 

113 if match_types(w_type, r_schema["name"]): 

114 return r_schema["name"] 

115 elif match_types(w_type, r_type): 

116 return r_schema 

117 raise SchemaResolutionError(error_msg) 

118 

119 

120def read_null( 

121 decoder, 

122 writer_schema=None, 

123 named_schemas=None, 

124 reader_schema=None, 

125 return_record_name=False, 

126 return_record_name_override=False, 

127): 

128 return decoder.read_null() 

129 

130 

131def skip_null(decoder, writer_schema=None, named_schemas=None): 

132 decoder.read_null() 

133 

134 

135def read_boolean( 

136 decoder, 

137 writer_schema=None, 

138 named_schemas=None, 

139 reader_schema=None, 

140 return_record_name=False, 

141 return_record_name_override=False, 

142): 

143 return decoder.read_boolean() 

144 

145 

146def skip_boolean(decoder, writer_schema=None, named_schemas=None): 

147 decoder.read_boolean() 

148 

149 

150def read_int( 

151 decoder, 

152 writer_schema=None, 

153 named_schemas=None, 

154 reader_schema=None, 

155 return_record_name=False, 

156 return_record_name_override=False, 

157): 

158 return decoder.read_int() 

159 

160 

161def skip_int(decoder, writer_schema=None, named_schemas=None): 

162 decoder.read_int() 

163 

164 

165def read_long( 

166 decoder, 

167 writer_schema=None, 

168 named_schemas=None, 

169 reader_schema=None, 

170 return_record_name=False, 

171 return_record_name_override=False, 

172): 

173 return decoder.read_long() 

174 

175 

176def skip_long(decoder, writer_schema=None, named_schemas=None): 

177 decoder.read_long() 

178 

179 

180def read_float( 

181 decoder, 

182 writer_schema=None, 

183 named_schemas=None, 

184 reader_schema=None, 

185 return_record_name=False, 

186 return_record_name_override=False, 

187): 

188 return decoder.read_float() 

189 

190 

191def skip_float(decoder, writer_schema=None, named_schemas=None): 

192 decoder.read_float() 

193 

194 

195def read_double( 

196 decoder, 

197 writer_schema=None, 

198 named_schemas=None, 

199 reader_schema=None, 

200 return_record_name=False, 

201 return_record_name_override=False, 

202): 

203 return decoder.read_double() 

204 

205 

206def skip_double(decoder, writer_schema=None, named_schemas=None): 

207 decoder.read_double() 

208 

209 

210def read_bytes( 

211 decoder, 

212 writer_schema=None, 

213 named_schemas=None, 

214 reader_schema=None, 

215 return_record_name=False, 

216 return_record_name_override=False, 

217): 

218 return decoder.read_bytes() 

219 

220 

221def skip_bytes(decoder, writer_schema=None, named_schemas=None): 

222 decoder.read_bytes() 

223 

224 

225def read_utf8( 

226 decoder, 

227 writer_schema=None, 

228 named_schemas=None, 

229 reader_schema=None, 

230 return_record_name=False, 

231 return_record_name_override=False, 

232): 

233 return decoder.read_utf8() 

234 

235 

236def skip_utf8(decoder, writer_schema=None, named_schemas=None): 

237 decoder.read_utf8() 

238 

239 

240def read_fixed( 

241 decoder, 

242 writer_schema, 

243 named_schemas=None, 

244 reader_schema=None, 

245 return_record_name=False, 

246 return_record_name_override=False, 

247): 

248 size = writer_schema["size"] 

249 return decoder.read_fixed(size) 

250 

251 

252def skip_fixed(decoder, writer_schema, named_schemas=None): 

253 size = writer_schema["size"] 

254 decoder.read_fixed(size) 

255 

256 

257def read_enum( 

258 decoder, 

259 writer_schema, 

260 named_schemas, 

261 reader_schema=None, 

262 return_record_name=False, 

263 return_record_name_override=False, 

264): 

265 symbol = writer_schema["symbols"][decoder.read_enum()] 

266 if reader_schema and symbol not in reader_schema["symbols"]: 

267 default = reader_schema.get("default") 

268 if default: 

269 return default 

270 else: 

271 symlist = reader_schema["symbols"] 

272 msg = f"{symbol} not found in reader symbol list {symlist}" 

273 raise SchemaResolutionError(msg) 

274 return symbol 

275 

276 

277def skip_enum(decoder, writer_schema, named_schemas): 

278 decoder.read_enum() 

279 

280 

281def read_array( 

282 decoder, 

283 writer_schema, 

284 named_schemas, 

285 reader_schema=None, 

286 return_record_name=False, 

287 return_record_name_override=False, 

288): 

289 if reader_schema: 

290 

291 def item_reader( 

292 decoder, w_schema, r_schema, return_record_name, return_record_name_override 

293 ): 

294 return read_data( 

295 decoder, 

296 w_schema["items"], 

297 named_schemas, 

298 r_schema["items"], 

299 return_record_name, 

300 return_record_name_override, 

301 ) 

302 

303 else: 

304 

305 def item_reader( 

306 decoder, w_schema, r_schema, return_record_name, return_record_name_override 

307 ): 

308 return read_data( 

309 decoder, 

310 w_schema["items"], 

311 named_schemas, 

312 None, 

313 return_record_name, 

314 return_record_name_override, 

315 ) 

316 

317 read_items = [] 

318 

319 decoder.read_array_start() 

320 

321 for item in decoder.iter_array(): 

322 read_items.append( 

323 item_reader( 

324 decoder, 

325 writer_schema, 

326 reader_schema, 

327 return_record_name, 

328 return_record_name_override, 

329 ) 

330 ) 

331 

332 decoder.read_array_end() 

333 

334 return read_items 

335 

336 

337def skip_array(decoder, writer_schema, named_schemas): 

338 decoder.read_array_start() 

339 

340 for item in decoder.iter_array(): 

341 skip_data(decoder, writer_schema["items"], named_schemas) 

342 

343 decoder.read_array_end() 

344 

345 

346def read_map( 

347 decoder, 

348 writer_schema, 

349 named_schemas, 

350 reader_schema=None, 

351 return_record_name=False, 

352 return_record_name_override=False, 

353): 

354 if reader_schema: 

355 

356 def item_reader(decoder, w_schema, r_schema): 

357 return read_data( 

358 decoder, 

359 w_schema["values"], 

360 named_schemas, 

361 r_schema["values"], 

362 return_record_name, 

363 return_record_name_override, 

364 ) 

365 

366 else: 

367 

368 def item_reader(decoder, w_schema, r_schema): 

369 return read_data( 

370 decoder, 

371 w_schema["values"], 

372 named_schemas, 

373 None, 

374 return_record_name, 

375 return_record_name_override, 

376 ) 

377 

378 read_items = {} 

379 

380 decoder.read_map_start() 

381 

382 for item in decoder.iter_map(): 

383 key = decoder.read_utf8() 

384 read_items[key] = item_reader(decoder, writer_schema, reader_schema) 

385 

386 decoder.read_map_end() 

387 

388 return read_items 

389 

390 

391def skip_map(decoder, writer_schema, named_schemas): 

392 decoder.read_map_start() 

393 

394 for item in decoder.iter_map(): 

395 decoder.read_utf8() 

396 skip_data(decoder, writer_schema["values"], named_schemas) 

397 

398 decoder.read_map_end() 

399 

400 

401def read_union( 

402 decoder, 

403 writer_schema, 

404 named_schemas, 

405 reader_schema=None, 

406 return_record_name=False, 

407 return_record_name_override=False, 

408): 

409 # schema resolution 

410 index = decoder.read_index() 

411 idx_schema = writer_schema[index] 

412 

413 if reader_schema: 

414 # Handle case where the reader schema is just a single type (not union) 

415 if not isinstance(reader_schema, list): 

416 if match_types(idx_schema, reader_schema): 

417 return read_data( 

418 decoder, 

419 idx_schema, 

420 named_schemas, 

421 reader_schema, 

422 return_record_name, 

423 return_record_name_override, 

424 ) 

425 else: 

426 for schema in reader_schema: 

427 if match_types(idx_schema, schema): 

428 return read_data( 

429 decoder, 

430 idx_schema, 

431 named_schemas, 

432 schema, 

433 return_record_name, 

434 return_record_name_override, 

435 ) 

436 msg = f"schema mismatch: {writer_schema} not found in {reader_schema}" 

437 raise SchemaResolutionError(msg) 

438 else: 

439 if return_record_name_override and is_nullable_union(writer_schema): 

440 return read_data( 

441 decoder, 

442 idx_schema, 

443 named_schemas, 

444 None, 

445 return_record_name, 

446 return_record_name_override, 

447 ) 

448 elif return_record_name and extract_record_type(idx_schema) == "record": 

449 return ( 

450 idx_schema["name"], 

451 read_data( 

452 decoder, 

453 idx_schema, 

454 named_schemas, 

455 None, 

456 return_record_name, 

457 return_record_name_override, 

458 ), 

459 ) 

460 elif return_record_name and extract_record_type(idx_schema) not in AVRO_TYPES: 

461 # idx_schema is a named type 

462 return ( 

463 named_schemas["writer"][idx_schema]["name"], 

464 read_data( 

465 decoder, 

466 idx_schema, 

467 named_schemas, 

468 None, 

469 return_record_name, 

470 return_record_name_override, 

471 ), 

472 ) 

473 else: 

474 return read_data(decoder, idx_schema, named_schemas) 

475 

476 

477def skip_union(decoder, writer_schema, named_schemas): 

478 # schema resolution 

479 index = decoder.read_index() 

480 skip_data(decoder, writer_schema[index], named_schemas) 

481 

482 

483def read_record( 

484 decoder, 

485 writer_schema, 

486 named_schemas, 

487 reader_schema=None, 

488 return_record_name=False, 

489 return_record_name_override=False, 

490): 

491 """A record is encoded by encoding the values of its fields in the order 

492 that they are declared. In other words, a record is encoded as just the 

493 concatenation of the encodings of its fields. Field values are encoded per 

494 their schema. 

495 

496 Schema Resolution: 

497 * the ordering of fields may be different: fields are matched by name. 

498 * schemas for fields with the same name in both records are resolved 

499 recursively. 

500 * if the writer's record contains a field with a name not present in the 

501 reader's record, the writer's value for that field is ignored. 

502 * if the reader's record schema has a field that contains a default value, 

503 and writer's schema does not have a field with the same name, then the 

504 reader should use the default value from its field. 

505 * if the reader's record schema has a field with no default value, and 

506 writer's schema does not have a field with the same name, then the 

507 field's value is unset. 

508 """ 

509 record = {} 

510 if reader_schema is None: 

511 for field in writer_schema["fields"]: 

512 record[field["name"]] = read_data( 

513 decoder, 

514 field["type"], 

515 named_schemas, 

516 None, 

517 return_record_name, 

518 return_record_name_override, 

519 ) 

520 else: 

521 readers_field_dict = {} 

522 aliases_field_dict = {} 

523 for f in reader_schema["fields"]: 

524 readers_field_dict[f["name"]] = f 

525 for alias in f.get("aliases", []): 

526 aliases_field_dict[alias] = f 

527 

528 for field in writer_schema["fields"]: 

529 readers_field = readers_field_dict.get( 

530 field["name"], 

531 aliases_field_dict.get(field["name"]), 

532 ) 

533 if readers_field: 

534 record[readers_field["name"]] = read_data( 

535 decoder, 

536 field["type"], 

537 named_schemas, 

538 readers_field["type"], 

539 return_record_name, 

540 return_record_name_override, 

541 ) 

542 else: 

543 skip_data(decoder, field["type"], named_schemas) 

544 

545 # fill in default values 

546 if len(readers_field_dict) > len(record): 

547 writer_fields = [f["name"] for f in writer_schema["fields"]] 

548 for f_name, field in readers_field_dict.items(): 

549 if f_name not in writer_fields and f_name not in record: 

550 if "default" in field: 

551 record[field["name"]] = field["default"] 

552 else: 

553 msg = f'No default value for {field["name"]}' 

554 raise SchemaResolutionError(msg) 

555 

556 return record 

557 

558 

559def skip_record(decoder, writer_schema, named_schemas): 

560 for field in writer_schema["fields"]: 

561 skip_data(decoder, field["type"], named_schemas) 

562 

563 

564READERS = { 

565 "null": read_null, 

566 "boolean": read_boolean, 

567 "string": read_utf8, 

568 "int": read_int, 

569 "long": read_long, 

570 "float": read_float, 

571 "double": read_double, 

572 "bytes": read_bytes, 

573 "fixed": read_fixed, 

574 "enum": read_enum, 

575 "array": read_array, 

576 "map": read_map, 

577 "union": read_union, 

578 "error_union": read_union, 

579 "record": read_record, 

580 "error": read_record, 

581 "request": read_record, 

582} 

583 

584SKIPS = { 

585 "null": skip_null, 

586 "boolean": skip_boolean, 

587 "string": skip_utf8, 

588 "int": skip_int, 

589 "long": skip_long, 

590 "float": skip_float, 

591 "double": skip_double, 

592 "bytes": skip_bytes, 

593 "fixed": skip_fixed, 

594 "enum": skip_enum, 

595 "array": skip_array, 

596 "map": skip_map, 

597 "union": skip_union, 

598 "error_union": skip_union, 

599 "record": skip_record, 

600 "error": skip_record, 

601 "request": skip_record, 

602} 

603 

604 

605def maybe_promote(data, writer_type, reader_type): 

606 if writer_type == "int": 

607 # No need to promote to long since they are the same type in Python 

608 if reader_type == "float" or reader_type == "double": 

609 return float(data) 

610 if writer_type == "long": 

611 if reader_type == "float" or reader_type == "double": 

612 return float(data) 

613 if writer_type == "string" and reader_type == "bytes": 

614 return data.encode() 

615 if writer_type == "bytes" and reader_type == "string": 

616 return data.decode() 

617 return data 

618 

619 

620def read_data( 

621 decoder, 

622 writer_schema, 

623 named_schemas, 

624 reader_schema=None, 

625 return_record_name=False, 

626 return_record_name_override=False, 

627): 

628 """Read data from file object according to schema.""" 

629 

630 record_type = extract_record_type(writer_schema) 

631 

632 if reader_schema: 

633 reader_schema = match_schemas(writer_schema, reader_schema) 

634 

635 reader_fn = READERS.get(record_type) 

636 if reader_fn: 

637 try: 

638 data = reader_fn( 

639 decoder, 

640 writer_schema, 

641 named_schemas, 

642 reader_schema, 

643 return_record_name, 

644 return_record_name_override, 

645 ) 

646 except StructError: 

647 raise EOFError(f"cannot read {record_type} from {decoder.fo}") 

648 

649 if "logicalType" in writer_schema: 

650 logical_type = extract_logical_type(writer_schema) 

651 fn = LOGICAL_READERS.get(logical_type) 

652 if fn: 

653 return fn(data, writer_schema, reader_schema) 

654 

655 if reader_schema is not None: 

656 return maybe_promote(data, record_type, extract_record_type(reader_schema)) 

657 else: 

658 return data 

659 else: 

660 return read_data( 

661 decoder, 

662 named_schemas["writer"][record_type], 

663 named_schemas, 

664 named_schemas["reader"].get(reader_schema), 

665 return_record_name, 

666 return_record_name_override, 

667 ) 

668 

669 

670def skip_data(decoder, writer_schema, named_schemas): 

671 record_type = extract_record_type(writer_schema) 

672 

673 reader_fn = SKIPS.get(record_type) 

674 if reader_fn: 

675 reader_fn(decoder, writer_schema, named_schemas) 

676 else: 

677 skip_data(decoder, named_schemas["writer"][record_type], named_schemas) 

678 

679 

680def skip_sync(fo, sync_marker): 

681 """Skip an expected sync marker, complaining if it doesn't match""" 

682 if fo.read(SYNC_SIZE) != sync_marker: 

683 raise ValueError("expected sync marker not found") 

684 

685 

686def null_read_block(decoder): 

687 """Read block in "null" codec.""" 

688 return BytesIO(decoder.read_bytes()) 

689 

690 

691def deflate_read_block(decoder): 

692 """Read block in "deflate" codec.""" 

693 data = decoder.read_bytes() 

694 # -15 is the log of the window size; negative indicates "raw" (no 

695 # zlib headers) decompression. See zlib.h. 

696 return BytesIO(zlib.decompressobj(-15).decompress(data)) 

697 

698 

699def bzip2_read_block(decoder): 

700 """Read block in "bzip2" codec.""" 

701 data = decoder.read_bytes() 

702 return BytesIO(bz2.decompress(data)) 

703 

704 

705def xz_read_block(decoder): 

706 length = read_long(decoder) 

707 data = decoder.read_fixed(length) 

708 return BytesIO(lzma.decompress(data)) 

709 

710 

711BLOCK_READERS = { 

712 "null": null_read_block, 

713 "deflate": deflate_read_block, 

714 "bzip2": bzip2_read_block, 

715 "xz": xz_read_block, 

716} 

717 

718 

719def snappy_read_block(decoder): 

720 length = read_long(decoder) 

721 data = decoder.read_fixed(length - 4) 

722 decoder.read_fixed(4) # CRC 

723 return BytesIO(snappy.decompress(data)) 

724 

725 

726try: 

727 import snappy 

728except ImportError: 

729 BLOCK_READERS["snappy"] = missing_codec_lib("snappy", "python-snappy") 

730else: 

731 BLOCK_READERS["snappy"] = snappy_read_block 

732 

733 

734def zstandard_read_block(decoder): 

735 length = read_long(decoder) 

736 data = decoder.read_fixed(length) 

737 return BytesIO(zstd.ZstdDecompressor().decompressobj().decompress(data)) 

738 

739 

740try: 

741 import zstandard as zstd 

742except ImportError: 

743 BLOCK_READERS["zstandard"] = missing_codec_lib("zstandard", "zstandard") 

744else: 

745 BLOCK_READERS["zstandard"] = zstandard_read_block 

746 

747 

748def lz4_read_block(decoder): 

749 length = read_long(decoder) 

750 data = decoder.read_fixed(length) 

751 return BytesIO(lz4.block.decompress(data)) 

752 

753 

754try: 

755 import lz4.block 

756except ImportError: 

757 BLOCK_READERS["lz4"] = missing_codec_lib("lz4", "lz4") 

758else: 

759 BLOCK_READERS["lz4"] = lz4_read_block 

760 

761 

762def _iter_avro_records( 

763 decoder, 

764 header, 

765 codec, 

766 writer_schema, 

767 named_schemas, 

768 reader_schema, 

769 return_record_name=False, 

770 return_record_name_override=False, 

771): 

772 """Return iterator over avro records.""" 

773 sync_marker = header["sync"] 

774 

775 read_block = BLOCK_READERS.get(codec) 

776 if not read_block: 

777 raise ValueError(f"Unrecognized codec: {codec}") 

778 

779 block_count = 0 

780 while True: 

781 try: 

782 block_count = decoder.read_long() 

783 except StopIteration: 

784 return 

785 

786 block_fo = read_block(decoder) 

787 

788 for i in range(block_count): 

789 yield read_data( 

790 BinaryDecoder(block_fo), 

791 writer_schema, 

792 named_schemas, 

793 reader_schema, 

794 return_record_name, 

795 return_record_name_override, 

796 ) 

797 

798 skip_sync(decoder.fo, sync_marker) 

799 

800 

801def _iter_avro_blocks( 

802 decoder, 

803 header, 

804 codec, 

805 writer_schema, 

806 named_schemas, 

807 reader_schema, 

808 return_record_name=False, 

809 return_record_name_override=False, 

810): 

811 """Return iterator over avro blocks.""" 

812 sync_marker = header["sync"] 

813 

814 read_block = BLOCK_READERS.get(codec) 

815 if not read_block: 

816 raise ValueError(f"Unrecognized codec: {codec}") 

817 

818 while True: 

819 offset = decoder.fo.tell() 

820 try: 

821 num_block_records = decoder.read_long() 

822 except StopIteration: 

823 return 

824 

825 block_bytes = read_block(decoder) 

826 

827 skip_sync(decoder.fo, sync_marker) 

828 

829 size = decoder.fo.tell() - offset 

830 

831 yield Block( 

832 block_bytes, 

833 num_block_records, 

834 codec, 

835 reader_schema, 

836 writer_schema, 

837 named_schemas, 

838 offset, 

839 size, 

840 return_record_name, 

841 return_record_name_override, 

842 ) 

843 

844 

845class Block: 

846 """An avro block. Will yield records when iterated over 

847 

848 .. attribute:: num_records 

849 

850 Number of records in the block 

851 

852 .. attribute:: writer_schema 

853 

854 The schema used when writing 

855 

856 .. attribute:: reader_schema 

857 

858 The schema used when reading (if provided) 

859 

860 .. attribute:: offset 

861 

862 Offset of the block from the beginning of the avro file 

863 

864 .. attribute:: size 

865 

866 Size of the block in bytes 

867 """ 

868 

869 def __init__( 

870 self, 

871 bytes_, 

872 num_records, 

873 codec, 

874 reader_schema, 

875 writer_schema, 

876 named_schemas, 

877 offset, 

878 size, 

879 return_record_name=False, 

880 return_record_name_override=False, 

881 ): 

882 self.bytes_ = bytes_ 

883 self.num_records = num_records 

884 self.codec = codec 

885 self.reader_schema = reader_schema 

886 self.writer_schema = writer_schema 

887 self._named_schemas = named_schemas 

888 self.offset = offset 

889 self.size = size 

890 self.return_record_name = return_record_name 

891 self.return_record_name_override = return_record_name_override 

892 

893 def __iter__(self): 

894 for i in range(self.num_records): 

895 yield read_data( 

896 BinaryDecoder(self.bytes_), 

897 self.writer_schema, 

898 self._named_schemas, 

899 self.reader_schema, 

900 self.return_record_name, 

901 self.return_record_name_override, 

902 ) 

903 

904 def __str__(self): 

905 return ( 

906 f"Avro block: {len(self.bytes_)} bytes, " 

907 + f"{self.num_records} records, " 

908 + f"codec: {self.codec}, position {self.offset}+{self.size}" 

909 ) 

910 

911 

912class file_reader(Generic[T]): 

913 def __init__( 

914 self, 

915 fo_or_decoder, 

916 reader_schema=None, 

917 return_record_name=False, 

918 return_record_name_override=False, 

919 ): 

920 if isinstance(fo_or_decoder, AvroJSONDecoder): 

921 self.decoder = fo_or_decoder 

922 else: 

923 # If a decoder was not provided, assume binary 

924 self.decoder = BinaryDecoder(fo_or_decoder) 

925 

926 self._named_schemas = {"writer": {}, "reader": {}} 

927 if reader_schema: 

928 self.reader_schema = parse_schema( 

929 reader_schema, self._named_schemas["reader"], _write_hint=False 

930 ) 

931 

932 else: 

933 self.reader_schema = None 

934 self.return_record_name = return_record_name 

935 self.return_record_name_override = return_record_name_override 

936 self._elems = None 

937 

938 def _read_header(self): 

939 try: 

940 self._header = read_data( 

941 self.decoder, 

942 HEADER_SCHEMA, 

943 self._named_schemas, 

944 None, 

945 self.return_record_name, 

946 self.return_record_name_override, 

947 ) 

948 except (StopIteration, EOFError): 

949 raise ValueError("cannot read header - is it an avro file?") 

950 

951 # `meta` values are bytes. So, the actual decoding has to be external. 

952 self.metadata = {k: v.decode() for k, v in self._header["meta"].items()} 

953 

954 self._schema = json.loads(self.metadata["avro.schema"]) 

955 self.codec = self.metadata.get("avro.codec", "null") 

956 

957 # Older avro files created before we were more strict about 

958 # defaults might have been writen with a bad default. Since we re-parse 

959 # the writer schema here, it will now fail. Therefore, if a user 

960 # provides a reader schema that passes parsing, we will ignore those 

961 # default errors 

962 if self.reader_schema is not None: 

963 ignore_default_error = True 

964 else: 

965 ignore_default_error = False 

966 

967 # Always parse the writer schema since it might have named types that 

968 # need to be stored in self._named_types 

969 self.writer_schema = parse_schema( 

970 self._schema, 

971 self._named_schemas["writer"], 

972 _write_hint=False, 

973 _force=True, 

974 _ignore_default_error=ignore_default_error, 

975 ) 

976 

977 @property 

978 def schema(self): 

979 import warnings 

980 

981 warnings.warn( 

982 "The 'schema' attribute is deprecated. Please use 'writer_schema'", 

983 DeprecationWarning, 

984 ) 

985 return self._schema 

986 

987 def __iter__(self) -> Iterator[T]: 

988 if not self._elems: 

989 raise NotImplementedError 

990 return self._elems 

991 

992 def __next__(self) -> T: 

993 return next(self._elems) 

994 

995 

996class reader(file_reader[AvroMessage]): 

997 """Iterator over records in an avro file. 

998 

999 Parameters 

1000 ---------- 

1001 fo 

1002 File-like object to read from 

1003 reader_schema 

1004 Reader schema 

1005 return_record_name 

1006 If true, when reading a union of records, the result will be a tuple 

1007 where the first value is the name of the record and the second value is 

1008 the record itself 

1009 return_record_name_override 

1010 If true, this will modify the behavior of return_record_name so that 

1011 the record name is only returned for unions where there is more than 

1012 one record. For unions that only have one record, this option will make 

1013 it so that the record is returned by itself, not a tuple with the name. 

1014 

1015 

1016 Example:: 

1017 

1018 from fastavro import reader 

1019 with open('some-file.avro', 'rb') as fo: 

1020 avro_reader = reader(fo) 

1021 for record in avro_reader: 

1022 process_record(record) 

1023 

1024 The `fo` argument is a file-like object so another common example usage 

1025 would use an `io.BytesIO` object like so:: 

1026 

1027 from io import BytesIO 

1028 from fastavro import writer, reader 

1029 

1030 fo = BytesIO() 

1031 writer(fo, schema, records) 

1032 fo.seek(0) 

1033 for record in reader(fo): 

1034 process_record(record) 

1035 

1036 .. attribute:: metadata 

1037 

1038 Key-value pairs in the header metadata 

1039 

1040 .. attribute:: codec 

1041 

1042 The codec used when writing 

1043 

1044 .. attribute:: writer_schema 

1045 

1046 The schema used when writing 

1047 

1048 .. attribute:: reader_schema 

1049 

1050 The schema used when reading (if provided) 

1051 """ 

1052 

1053 def __init__( 

1054 self, 

1055 fo: Union[IO, AvroJSONDecoder], 

1056 reader_schema: Optional[Schema] = None, 

1057 return_record_name: bool = False, 

1058 return_record_name_override: bool = False, 

1059 ): 

1060 super().__init__( 

1061 fo, reader_schema, return_record_name, return_record_name_override 

1062 ) 

1063 

1064 if isinstance(self.decoder, AvroJSONDecoder): 

1065 self.decoder.configure(self.reader_schema, self._named_schemas["reader"]) 

1066 

1067 self.writer_schema = self.reader_schema 

1068 self.reader_schema = None 

1069 self._named_schemas["writer"] = self._named_schemas["reader"] 

1070 self._named_schemas["reader"] = {} 

1071 

1072 def _elems(): 

1073 while not self.decoder.done: 

1074 yield read_data( 

1075 self.decoder, 

1076 self.writer_schema, 

1077 self._named_schemas, 

1078 self.reader_schema, 

1079 self.return_record_name, 

1080 self.return_record_name_override, 

1081 ) 

1082 self.decoder.drain() 

1083 

1084 self._elems = _elems() 

1085 

1086 else: 

1087 self._read_header() 

1088 

1089 self._elems = _iter_avro_records( 

1090 self.decoder, 

1091 self._header, 

1092 self.codec, 

1093 self.writer_schema, 

1094 self._named_schemas, 

1095 self.reader_schema, 

1096 self.return_record_name, 

1097 self.return_record_name_override, 

1098 ) 

1099 

1100 

1101class block_reader(file_reader[Block]): 

1102 """Iterator over :class:`.Block` in an avro file. 

1103 

1104 Parameters 

1105 ---------- 

1106 fo 

1107 Input stream 

1108 reader_schema 

1109 Reader schema 

1110 return_record_name 

1111 If true, when reading a union of records, the result will be a tuple 

1112 where the first value is the name of the record and the second value is 

1113 the record itself 

1114 return_record_name_override 

1115 If true, this will modify the behavior of return_record_name so that 

1116 the record name is only returned for unions where there is more than 

1117 one record. For unions that only have one record, this option will make 

1118 it so that the record is returned by itself, not a tuple with the name. 

1119 

1120 

1121 Example:: 

1122 

1123 from fastavro import block_reader 

1124 with open('some-file.avro', 'rb') as fo: 

1125 avro_reader = block_reader(fo) 

1126 for block in avro_reader: 

1127 process_block(block) 

1128 

1129 .. attribute:: metadata 

1130 

1131 Key-value pairs in the header metadata 

1132 

1133 .. attribute:: codec 

1134 

1135 The codec used when writing 

1136 

1137 .. attribute:: writer_schema 

1138 

1139 The schema used when writing 

1140 

1141 .. attribute:: reader_schema 

1142 

1143 The schema used when reading (if provided) 

1144 """ 

1145 

1146 def __init__( 

1147 self, 

1148 fo: IO, 

1149 reader_schema: Optional[Schema] = None, 

1150 return_record_name: bool = False, 

1151 return_record_name_override: bool = False, 

1152 ): 

1153 super().__init__( 

1154 fo, reader_schema, return_record_name, return_record_name_override 

1155 ) 

1156 

1157 self._read_header() 

1158 

1159 self._elems = _iter_avro_blocks( 

1160 self.decoder, 

1161 self._header, 

1162 self.codec, 

1163 self.writer_schema, 

1164 self._named_schemas, 

1165 self.reader_schema, 

1166 self.return_record_name, 

1167 self.return_record_name_override, 

1168 ) 

1169 

1170 

1171def schemaless_reader( 

1172 fo: IO, 

1173 writer_schema: Schema, 

1174 reader_schema: Optional[Schema] = None, 

1175 return_record_name: bool = False, 

1176 return_record_name_override: bool = False, 

1177) -> AvroMessage: 

1178 """Reads a single record written using the 

1179 :meth:`~fastavro._write_py.schemaless_writer` 

1180 

1181 Parameters 

1182 ---------- 

1183 fo 

1184 Input stream 

1185 writer_schema 

1186 Schema used when calling schemaless_writer 

1187 reader_schema 

1188 If the schema has changed since being written then the new schema can 

1189 be given to allow for schema migration 

1190 return_record_name 

1191 If true, when reading a union of records, the result will be a tuple 

1192 where the first value is the name of the record and the second value is 

1193 the record itself 

1194 return_record_name_override 

1195 If true, this will modify the behavior of return_record_name so that 

1196 the record name is only returned for unions where there is more than 

1197 one record. For unions that only have one record, this option will make 

1198 it so that the record is returned by itself, not a tuple with the name. 

1199 

1200 

1201 Example:: 

1202 

1203 parsed_schema = fastavro.parse_schema(schema) 

1204 with open('file', 'rb') as fp: 

1205 record = fastavro.schemaless_reader(fp, parsed_schema) 

1206 

1207 Note: The ``schemaless_reader`` can only read a single record. 

1208 """ 

1209 if writer_schema == reader_schema: 

1210 # No need for the reader schema if they are the same 

1211 reader_schema = None 

1212 

1213 named_schemas: Dict[str, NamedSchemas] = {"writer": {}, "reader": {}} 

1214 writer_schema = parse_schema(writer_schema, named_schemas["writer"]) 

1215 

1216 if reader_schema: 

1217 reader_schema = parse_schema(reader_schema, named_schemas["reader"]) 

1218 

1219 decoder = BinaryDecoder(fo) 

1220 

1221 return read_data( 

1222 decoder, 

1223 writer_schema, 

1224 named_schemas, 

1225 reader_schema, 

1226 return_record_name, 

1227 return_record_name_override, 

1228 ) 

1229 

1230 

1231def is_avro(path_or_buffer: Union[str, IO]) -> bool: 

1232 """Return True if path (or buffer) points to an Avro file. This will only 

1233 work for avro files that contain the normal avro schema header like those 

1234 create from :func:`~fastavro._write_py.writer`. This function is not intended 

1235 to be used with binary data created from 

1236 :func:`~fastavro._write_py.schemaless_writer` since that does not include the 

1237 avro header. 

1238 

1239 Parameters 

1240 ---------- 

1241 path_or_buffer 

1242 Path to file 

1243 """ 

1244 fp: IO 

1245 if isinstance(path_or_buffer, str): 

1246 fp = open(path_or_buffer, "rb") 

1247 close = True 

1248 else: 

1249 fp = path_or_buffer 

1250 close = False 

1251 

1252 try: 

1253 header = fp.read(len(MAGIC)) 

1254 return header == MAGIC 

1255 finally: 

1256 if close: 

1257 fp.close()