Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastavro/_read_py.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

444 statements  

1"""Python code for reading AVRO files""" 

2 

3# This code is a modified version of the code at 

4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under 

5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0) 

6 

7import bz2 

8import json 

9import lzma 

10import zlib 

11from datetime import datetime, timezone 

12from decimal import Context 

13from io import BytesIO 

14from struct import error as StructError 

15from typing import IO, Union, Optional, Generic, TypeVar, Iterator, Dict 

16from warnings import warn 

17 

18from .io.binary_decoder import BinaryDecoder 

19from .io.json_decoder import AvroJSONDecoder 

20from .logical_readers import LOGICAL_READERS 

21from .schema import ( 

22 extract_record_type, 

23 is_single_record_union, 

24 is_single_name_union, 

25 extract_logical_type, 

26 parse_schema, 

27) 

28from .types import Schema, AvroMessage, NamedSchemas 

29from ._read_common import ( 

30 SchemaResolutionError, 

31 MAGIC, 

32 SYNC_SIZE, 

33 HEADER_SCHEMA, 

34 missing_codec_lib, 

35) 

36from .const import NAMED_TYPES, AVRO_TYPES 

37 

38T = TypeVar("T") 

39 

40decimal_context = Context() 

41epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) 

42epoch_naive = datetime(1970, 1, 1) 

43 

44 

45def _default_named_schemas() -> Dict[str, NamedSchemas]: 

46 return {"writer": {}, "reader": {}} 

47 

48 

49def match_types(writer_type, reader_type, named_schemas): 

50 if isinstance(writer_type, list) or isinstance(reader_type, list): 

51 return True 

52 if isinstance(writer_type, dict) or isinstance(reader_type, dict): 

53 try: 

54 return match_schemas(writer_type, reader_type, named_schemas) 

55 except SchemaResolutionError: 

56 return False 

57 if writer_type == reader_type: 

58 return True 

59 # promotion cases 

60 elif writer_type == "int" and reader_type in ["long", "float", "double"]: 

61 return True 

62 elif writer_type == "long" and reader_type in ["float", "double"]: 

63 return True 

64 elif writer_type == "float" and reader_type == "double": 

65 return True 

66 elif writer_type == "string" and reader_type == "bytes": 

67 return True 

68 elif writer_type == "bytes" and reader_type == "string": 

69 return True 

70 writer_schema = named_schemas["writer"].get(writer_type) 

71 reader_schema = named_schemas["reader"].get(reader_type) 

72 if writer_schema is not None and reader_schema is not None: 

73 return match_types(writer_schema, reader_schema, named_schemas) 

74 return False 

75 

76 

77def match_schemas(w_schema, r_schema, named_schemas): 

78 error_msg = f"Schema mismatch: {w_schema} is not {r_schema}" 

79 if isinstance(w_schema, list): 

80 # If the writer is a union, checks will happen in read_union after the 

81 # correct schema is known 

82 return r_schema 

83 elif isinstance(r_schema, list): 

84 # If the reader is a union, ensure one of the new schemas is the same 

85 # as the writer 

86 for schema in r_schema: 

87 if match_types(w_schema, schema, named_schemas): 

88 return schema 

89 else: 

90 raise SchemaResolutionError(error_msg) 

91 else: 

92 # Check for dicts as primitive types are just strings 

93 if isinstance(w_schema, dict): 

94 w_type = w_schema["type"] 

95 else: 

96 w_type = w_schema 

97 if isinstance(r_schema, dict): 

98 r_type = r_schema["type"] 

99 else: 

100 r_type = r_schema 

101 

102 if w_type == r_type == "map": 

103 if match_types(w_schema["values"], r_schema["values"], named_schemas): 

104 return r_schema 

105 elif w_type == r_type == "array": 

106 if match_types(w_schema["items"], r_schema["items"], named_schemas): 

107 return r_schema 

108 elif w_type in NAMED_TYPES and r_type in NAMED_TYPES: 

109 if w_type == r_type == "fixed" and w_schema["size"] != r_schema["size"]: 

110 raise SchemaResolutionError( 

111 f"Schema mismatch: {w_schema} size is different than {r_schema} size" 

112 ) 

113 

114 w_unqual_name = w_schema["name"].split(".")[-1] 

115 r_unqual_name = r_schema["name"].split(".")[-1] 

116 r_aliases = r_schema.get("aliases", []) 

117 if ( 

118 w_unqual_name == r_unqual_name 

119 or w_schema["name"] in r_aliases 

120 or w_unqual_name in r_aliases 

121 ): 

122 return r_schema 

123 elif w_type not in AVRO_TYPES and r_type in NAMED_TYPES: 

124 if match_types(w_type, r_schema["name"], named_schemas): 

125 return r_schema["name"] 

126 elif match_types(w_type, r_type, named_schemas): 

127 return r_schema 

128 raise SchemaResolutionError(error_msg) 

129 

130 

131def read_null( 

132 decoder, 

133 writer_schema=None, 

134 named_schemas=None, 

135 reader_schema=None, 

136 options={}, 

137): 

138 return decoder.read_null() 

139 

140 

141def skip_null(decoder, writer_schema=None, named_schemas=None): 

142 decoder.read_null() 

143 

144 

145def read_boolean( 

146 decoder, 

147 writer_schema=None, 

148 named_schemas=None, 

149 reader_schema=None, 

150 options={}, 

151): 

152 return decoder.read_boolean() 

153 

154 

155def skip_boolean(decoder, writer_schema=None, named_schemas=None): 

156 decoder.read_boolean() 

157 

158 

159def read_int( 

160 decoder, 

161 writer_schema=None, 

162 named_schemas=None, 

163 reader_schema=None, 

164 options={}, 

165): 

166 return decoder.read_int() 

167 

168 

169def skip_int(decoder, writer_schema=None, named_schemas=None): 

170 decoder.read_int() 

171 

172 

173def read_long( 

174 decoder, 

175 writer_schema=None, 

176 named_schemas=None, 

177 reader_schema=None, 

178 options={}, 

179): 

180 return decoder.read_long() 

181 

182 

183def skip_long(decoder, writer_schema=None, named_schemas=None): 

184 decoder.read_long() 

185 

186 

187def read_float( 

188 decoder, 

189 writer_schema=None, 

190 named_schemas=None, 

191 reader_schema=None, 

192 options={}, 

193): 

194 return decoder.read_float() 

195 

196 

197def skip_float(decoder, writer_schema=None, named_schemas=None): 

198 decoder.read_float() 

199 

200 

201def read_double( 

202 decoder, 

203 writer_schema=None, 

204 named_schemas=None, 

205 reader_schema=None, 

206 options={}, 

207): 

208 return decoder.read_double() 

209 

210 

211def skip_double(decoder, writer_schema=None, named_schemas=None): 

212 decoder.read_double() 

213 

214 

215def read_bytes( 

216 decoder, 

217 writer_schema=None, 

218 named_schemas=None, 

219 reader_schema=None, 

220 options={}, 

221): 

222 return decoder.read_bytes() 

223 

224 

225def skip_bytes(decoder, writer_schema=None, named_schemas=None): 

226 decoder.read_bytes() 

227 

228 

229def read_utf8( 

230 decoder, 

231 writer_schema=None, 

232 named_schemas=None, 

233 reader_schema=None, 

234 options={}, 

235): 

236 return decoder.read_utf8( 

237 handle_unicode_errors=options.get("handle_unicode_errors", "strict") 

238 ) 

239 

240 

241def skip_utf8(decoder, writer_schema=None, named_schemas=None): 

242 decoder.read_utf8() 

243 

244 

245def read_fixed( 

246 decoder, 

247 writer_schema, 

248 named_schemas=None, 

249 reader_schema=None, 

250 options={}, 

251): 

252 size = writer_schema["size"] 

253 return decoder.read_fixed(size) 

254 

255 

256def skip_fixed(decoder, writer_schema, named_schemas=None): 

257 size = writer_schema["size"] 

258 decoder.read_fixed(size) 

259 

260 

261def read_enum( 

262 decoder, 

263 writer_schema, 

264 named_schemas, 

265 reader_schema=None, 

266 options={}, 

267): 

268 symbol = writer_schema["symbols"][decoder.read_enum()] 

269 if reader_schema and symbol not in reader_schema["symbols"]: 

270 default = reader_schema.get("default") 

271 if default: 

272 return default 

273 else: 

274 symlist = reader_schema["symbols"] 

275 msg = f"{symbol} not found in reader symbol list {reader_schema['name']}, known symbols: {symlist}" 

276 raise SchemaResolutionError(msg) 

277 return symbol 

278 

279 

280def skip_enum(decoder, writer_schema, named_schemas): 

281 decoder.read_enum() 

282 

283 

284def read_array( 

285 decoder, 

286 writer_schema, 

287 named_schemas, 

288 reader_schema=None, 

289 options={}, 

290): 

291 if reader_schema: 

292 

293 def item_reader(decoder, w_schema, r_schema, options): 

294 return read_data( 

295 decoder, 

296 w_schema["items"], 

297 named_schemas, 

298 r_schema["items"], 

299 options, 

300 ) 

301 

302 else: 

303 

304 def item_reader(decoder, w_schema, r_schema, options): 

305 return read_data( 

306 decoder, 

307 w_schema["items"], 

308 named_schemas, 

309 None, 

310 options, 

311 ) 

312 

313 read_items = [] 

314 

315 decoder.read_array_start() 

316 

317 for item in decoder.iter_array(): 

318 read_items.append( 

319 item_reader( 

320 decoder, 

321 writer_schema, 

322 reader_schema, 

323 options, 

324 ) 

325 ) 

326 

327 decoder.read_array_end() 

328 

329 return read_items 

330 

331 

332def skip_array(decoder, writer_schema, named_schemas): 

333 decoder.read_array_start() 

334 

335 for item in decoder.iter_array(): 

336 skip_data(decoder, writer_schema["items"], named_schemas) 

337 

338 decoder.read_array_end() 

339 

340 

341def read_map( 

342 decoder, 

343 writer_schema, 

344 named_schemas, 

345 reader_schema=None, 

346 options={}, 

347): 

348 if reader_schema: 

349 

350 def item_reader(decoder, w_schema, r_schema): 

351 return read_data( 

352 decoder, 

353 w_schema["values"], 

354 named_schemas, 

355 r_schema["values"], 

356 options, 

357 ) 

358 

359 else: 

360 

361 def item_reader(decoder, w_schema, r_schema): 

362 return read_data( 

363 decoder, 

364 w_schema["values"], 

365 named_schemas, 

366 None, 

367 options, 

368 ) 

369 

370 read_items = {} 

371 

372 decoder.read_map_start() 

373 

374 for item in decoder.iter_map(): 

375 key = decoder.read_utf8() 

376 read_items[key] = item_reader(decoder, writer_schema, reader_schema) 

377 

378 decoder.read_map_end() 

379 

380 return read_items 

381 

382 

383def skip_map(decoder, writer_schema, named_schemas): 

384 decoder.read_map_start() 

385 

386 for item in decoder.iter_map(): 

387 decoder.read_utf8() 

388 skip_data(decoder, writer_schema["values"], named_schemas) 

389 

390 decoder.read_map_end() 

391 

392 

393def read_union( 

394 decoder, 

395 writer_schema, 

396 named_schemas, 

397 reader_schema=None, 

398 options={}, 

399): 

400 # schema resolution 

401 index = decoder.read_index() 

402 idx_schema = writer_schema[index] 

403 

404 if reader_schema: 

405 msg = f"schema mismatch: {writer_schema} not found in {reader_schema}" 

406 # Handle case where the reader schema is just a single type (not union) 

407 if not isinstance(reader_schema, list): 

408 if match_types(idx_schema, reader_schema, named_schemas): 

409 result = read_data( 

410 decoder, 

411 idx_schema, 

412 named_schemas, 

413 reader_schema, 

414 options, 

415 ) 

416 else: 

417 raise SchemaResolutionError(msg) 

418 else: 

419 for schema in reader_schema: 

420 if match_types(idx_schema, schema, named_schemas): 

421 result = read_data( 

422 decoder, 

423 idx_schema, 

424 named_schemas, 

425 schema, 

426 options, 

427 ) 

428 break 

429 else: 

430 raise SchemaResolutionError(msg) 

431 else: 

432 result = read_data(decoder, idx_schema, named_schemas, None, options) 

433 

434 return_record_name_override = options.get("return_record_name_override") 

435 return_record_name = options.get("return_record_name") 

436 return_named_type_override = options.get("return_named_type_override") 

437 return_named_type = options.get("return_named_type") 

438 if return_named_type_override and is_single_name_union(writer_schema): 

439 return result 

440 elif return_named_type and extract_record_type(idx_schema) in NAMED_TYPES: 

441 return (idx_schema["name"], result) 

442 elif return_named_type and extract_record_type(idx_schema) not in AVRO_TYPES: 

443 # idx_schema is a named type 

444 return (named_schemas["writer"][idx_schema]["name"], result) 

445 elif return_record_name_override and is_single_record_union(writer_schema): 

446 return result 

447 elif return_record_name and extract_record_type(idx_schema) == "record": 

448 return (idx_schema["name"], result) 

449 elif return_record_name and extract_record_type(idx_schema) not in AVRO_TYPES: 

450 # idx_schema is a named type 

451 return (named_schemas["writer"][idx_schema]["name"], result) 

452 else: 

453 return result 

454 

455 

456def skip_union(decoder, writer_schema, named_schemas): 

457 # schema resolution 

458 index = decoder.read_index() 

459 skip_data(decoder, writer_schema[index], named_schemas) 

460 

461 

462def read_record( 

463 decoder, 

464 writer_schema, 

465 named_schemas, 

466 reader_schema=None, 

467 options={}, 

468): 

469 """A record is encoded by encoding the values of its fields in the order 

470 that they are declared. In other words, a record is encoded as just the 

471 concatenation of the encodings of its fields. Field values are encoded per 

472 their schema. 

473 

474 Schema Resolution: 

475 * the ordering of fields may be different: fields are matched by name. 

476 * schemas for fields with the same name in both records are resolved 

477 recursively. 

478 * if the writer's record contains a field with a name not present in the 

479 reader's record, the writer's value for that field is ignored. 

480 * if the reader's record schema has a field that contains a default value, 

481 and writer's schema does not have a field with the same name, then the 

482 reader should use the default value from its field. 

483 * if the reader's record schema has a field with no default value, and 

484 writer's schema does not have a field with the same name, then the 

485 field's value is unset. 

486 """ 

487 record = {} 

488 if reader_schema is None: 

489 for field in writer_schema["fields"]: 

490 record[field["name"]] = read_data( 

491 decoder, 

492 field["type"], 

493 named_schemas, 

494 None, 

495 options, 

496 ) 

497 else: 

498 readers_field_dict = {} 

499 aliases_field_dict = {} 

500 for f in reader_schema["fields"]: 

501 readers_field_dict[f["name"]] = f 

502 for alias in f.get("aliases", []): 

503 aliases_field_dict[alias] = f 

504 

505 for field in writer_schema["fields"]: 

506 readers_field = readers_field_dict.get( 

507 field["name"], 

508 aliases_field_dict.get(field["name"]), 

509 ) 

510 if readers_field: 

511 record[readers_field["name"]] = read_data( 

512 decoder, 

513 field["type"], 

514 named_schemas, 

515 readers_field["type"], 

516 options, 

517 ) 

518 else: 

519 skip_data(decoder, field["type"], named_schemas) 

520 

521 # fill in default values 

522 if len(readers_field_dict) > len(record): 

523 writer_fields = [f["name"] for f in writer_schema["fields"]] 

524 for f_name, field in readers_field_dict.items(): 

525 if f_name not in writer_fields and f_name not in record: 

526 if "default" in field: 

527 record[field["name"]] = field["default"] 

528 else: 

529 msg = f"No default value for field {field['name']} in {reader_schema['name']}" 

530 raise SchemaResolutionError(msg) 

531 

532 return record 

533 

534 

535def skip_record(decoder, writer_schema, named_schemas): 

536 for field in writer_schema["fields"]: 

537 skip_data(decoder, field["type"], named_schemas) 

538 

539 

540READERS = { 

541 "null": read_null, 

542 "boolean": read_boolean, 

543 "string": read_utf8, 

544 "int": read_int, 

545 "long": read_long, 

546 "float": read_float, 

547 "double": read_double, 

548 "bytes": read_bytes, 

549 "fixed": read_fixed, 

550 "enum": read_enum, 

551 "array": read_array, 

552 "map": read_map, 

553 "union": read_union, 

554 "error_union": read_union, 

555 "record": read_record, 

556 "error": read_record, 

557 "request": read_record, 

558} 

559 

560SKIPS = { 

561 "null": skip_null, 

562 "boolean": skip_boolean, 

563 "string": skip_utf8, 

564 "int": skip_int, 

565 "long": skip_long, 

566 "float": skip_float, 

567 "double": skip_double, 

568 "bytes": skip_bytes, 

569 "fixed": skip_fixed, 

570 "enum": skip_enum, 

571 "array": skip_array, 

572 "map": skip_map, 

573 "union": skip_union, 

574 "error_union": skip_union, 

575 "record": skip_record, 

576 "error": skip_record, 

577 "request": skip_record, 

578} 

579 

580 

581def maybe_promote(data, writer_type, reader_type): 

582 if writer_type == "int": 

583 # No need to promote to long since they are the same type in Python 

584 if reader_type == "float" or reader_type == "double": 

585 return float(data) 

586 if writer_type == "long": 

587 if reader_type == "float" or reader_type == "double": 

588 return float(data) 

589 if writer_type == "string" and reader_type == "bytes": 

590 return data.encode() 

591 if writer_type == "bytes" and reader_type == "string": 

592 return data.decode() 

593 return data 

594 

595 

596def read_data( 

597 decoder, 

598 writer_schema, 

599 named_schemas, 

600 reader_schema=None, 

601 options={}, 

602): 

603 """Read data from file object according to schema.""" 

604 

605 record_type = extract_record_type(writer_schema) 

606 

607 if reader_schema: 

608 reader_schema = match_schemas( 

609 writer_schema, 

610 reader_schema, 

611 named_schemas, 

612 ) 

613 

614 reader_fn = READERS.get(record_type) 

615 if reader_fn: 

616 try: 

617 data = reader_fn( 

618 decoder, 

619 writer_schema, 

620 named_schemas, 

621 reader_schema, 

622 options, 

623 ) 

624 except StructError: 

625 raise EOFError(f"cannot read {record_type} from {decoder.fo}") 

626 

627 if "logicalType" in writer_schema: 

628 logical_type = extract_logical_type(writer_schema) 

629 fn = LOGICAL_READERS.get(logical_type) 

630 if fn: 

631 return fn(data, writer_schema, reader_schema) 

632 

633 if reader_schema is not None: 

634 return maybe_promote(data, record_type, extract_record_type(reader_schema)) 

635 else: 

636 return data 

637 else: 

638 return read_data( 

639 decoder, 

640 named_schemas["writer"][record_type], 

641 named_schemas, 

642 named_schemas["reader"].get(reader_schema), 

643 options, 

644 ) 

645 

646 

647def skip_data(decoder, writer_schema, named_schemas): 

648 record_type = extract_record_type(writer_schema) 

649 

650 reader_fn = SKIPS.get(record_type) 

651 if reader_fn: 

652 reader_fn(decoder, writer_schema, named_schemas) 

653 else: 

654 skip_data(decoder, named_schemas["writer"][record_type], named_schemas) 

655 

656 

657def skip_sync(fo, sync_marker): 

658 """Skip an expected sync marker, complaining if it doesn't match""" 

659 if fo.read(SYNC_SIZE) != sync_marker: 

660 raise ValueError("expected sync marker not found") 

661 

662 

663def null_read_block(decoder): 

664 """Read block in "null" codec.""" 

665 return BytesIO(decoder.read_bytes()) 

666 

667 

668def deflate_read_block(decoder): 

669 """Read block in "deflate" codec.""" 

670 data = decoder.read_bytes() 

671 # -15 is the log of the window size; negative indicates "raw" (no 

672 # zlib headers) decompression. See zlib.h. 

673 return BytesIO(zlib.decompressobj(-15).decompress(data)) 

674 

675 

676def bzip2_read_block(decoder): 

677 """Read block in "bzip2" codec.""" 

678 data = decoder.read_bytes() 

679 return BytesIO(bz2.decompress(data)) 

680 

681 

682def xz_read_block(decoder): 

683 length = read_long(decoder) 

684 data = decoder.read_fixed(length) 

685 return BytesIO(lzma.decompress(data)) 

686 

687 

688BLOCK_READERS = { 

689 "null": null_read_block, 

690 "deflate": deflate_read_block, 

691 "bzip2": bzip2_read_block, 

692 "xz": xz_read_block, 

693} 

694 

695 

696def snappy_read_block(decoder): 

697 length = read_long(decoder) 

698 data = decoder.read_fixed(length - 4) 

699 decoder.read_fixed(4) # CRC 

700 return BytesIO(snappy_decompress(data)) 

701 

702 

703try: 

704 from cramjam import snappy 

705 

706 snappy_decompress = snappy.decompress_raw 

707except ImportError: 

708 try: 

709 import snappy 

710 

711 snappy_decompress = snappy.decompress 

712 warn( 

713 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed", 

714 DeprecationWarning, 

715 ) 

716 except ImportError: 

717 BLOCK_READERS["snappy"] = missing_codec_lib("snappy", "cramjam") 

718 else: 

719 BLOCK_READERS["snappy"] = snappy_read_block 

720else: 

721 BLOCK_READERS["snappy"] = snappy_read_block 

722 

723 

724def zstandard_read_block(decoder): 

725 length = read_long(decoder) 

726 data = decoder.read_fixed(length) 

727 return BytesIO(zstandard.ZstdDecompressor().decompressobj().decompress(data)) 

728 

729 

730try: 

731 import zstandard 

732except ImportError: 

733 BLOCK_READERS["zstandard"] = missing_codec_lib("zstandard", "zstandard") 

734else: 

735 BLOCK_READERS["zstandard"] = zstandard_read_block 

736 

737 

738def lz4_read_block(decoder): 

739 length = read_long(decoder) 

740 data = decoder.read_fixed(length) 

741 return BytesIO(lz4.block.decompress(data)) 

742 

743 

744try: 

745 import lz4.block 

746except ImportError: 

747 BLOCK_READERS["lz4"] = missing_codec_lib("lz4", "lz4") 

748else: 

749 BLOCK_READERS["lz4"] = lz4_read_block 

750 

751 

752def _iter_avro_records( 

753 decoder, 

754 header, 

755 codec, 

756 writer_schema, 

757 named_schemas, 

758 reader_schema, 

759 options, 

760): 

761 """Return iterator over avro records.""" 

762 sync_marker = header["sync"] 

763 

764 read_block = BLOCK_READERS.get(codec) 

765 if not read_block: 

766 raise ValueError(f"Unrecognized codec: {codec}") 

767 

768 block_count = 0 

769 while True: 

770 try: 

771 block_count = decoder.read_long() 

772 except EOFError: 

773 return 

774 

775 block_fo = read_block(decoder) 

776 

777 for i in range(block_count): 

778 yield read_data( 

779 BinaryDecoder(block_fo), 

780 writer_schema, 

781 named_schemas, 

782 reader_schema, 

783 options, 

784 ) 

785 

786 skip_sync(decoder.fo, sync_marker) 

787 

788 

789def _iter_avro_blocks( 

790 decoder, 

791 header, 

792 codec, 

793 writer_schema, 

794 named_schemas, 

795 reader_schema, 

796 options, 

797): 

798 """Return iterator over avro blocks.""" 

799 sync_marker = header["sync"] 

800 

801 read_block = BLOCK_READERS.get(codec) 

802 if not read_block: 

803 raise ValueError(f"Unrecognized codec: {codec}") 

804 

805 while True: 

806 offset = decoder.fo.tell() 

807 try: 

808 num_block_records = decoder.read_long() 

809 except EOFError: 

810 return 

811 

812 block_bytes = read_block(decoder) 

813 

814 skip_sync(decoder.fo, sync_marker) 

815 

816 size = decoder.fo.tell() - offset 

817 

818 yield Block( 

819 block_bytes, 

820 num_block_records, 

821 codec, 

822 reader_schema, 

823 writer_schema, 

824 named_schemas, 

825 offset, 

826 size, 

827 options, 

828 ) 

829 

830 

831class Block: 

832 """An avro block. Will yield records when iterated over 

833 

834 .. attribute:: num_records 

835 

836 Number of records in the block 

837 

838 .. attribute:: writer_schema 

839 

840 The schema used when writing 

841 

842 .. attribute:: reader_schema 

843 

844 The schema used when reading (if provided) 

845 

846 .. attribute:: offset 

847 

848 Offset of the block from the beginning of the avro file 

849 

850 .. attribute:: size 

851 

852 Size of the block in bytes 

853 """ 

854 

855 def __init__( 

856 self, 

857 bytes_, 

858 num_records, 

859 codec, 

860 reader_schema, 

861 writer_schema, 

862 named_schemas, 

863 offset, 

864 size, 

865 options, 

866 ): 

867 self.bytes_ = bytes_ 

868 self.num_records = num_records 

869 self.codec = codec 

870 self.reader_schema = reader_schema 

871 self.writer_schema = writer_schema 

872 self._named_schemas = named_schemas 

873 self.offset = offset 

874 self.size = size 

875 self.options = options 

876 

877 def __iter__(self): 

878 for i in range(self.num_records): 

879 yield read_data( 

880 BinaryDecoder(self.bytes_), 

881 self.writer_schema, 

882 self._named_schemas, 

883 self.reader_schema, 

884 self.options, 

885 ) 

886 

887 def __str__(self): 

888 return ( 

889 f"Avro block: {len(self.bytes_)} bytes, " 

890 + f"{self.num_records} records, " 

891 + f"codec: {self.codec}, position {self.offset}+{self.size}" 

892 ) 

893 

894 

895class file_reader(Generic[T]): 

896 def __init__( 

897 self, 

898 fo_or_decoder, 

899 reader_schema=None, 

900 options={}, 

901 ): 

902 if isinstance(fo_or_decoder, AvroJSONDecoder): 

903 self.decoder = fo_or_decoder 

904 else: 

905 # If a decoder was not provided, assume binary 

906 self.decoder = BinaryDecoder(fo_or_decoder) 

907 

908 self._named_schemas = _default_named_schemas() 

909 if reader_schema: 

910 self.reader_schema = parse_schema( 

911 reader_schema, self._named_schemas["reader"], _write_hint=False 

912 ) 

913 

914 else: 

915 self.reader_schema = None 

916 self.options = options 

917 self._elems = None 

918 

919 def _read_header(self): 

920 try: 

921 self._header = read_data( 

922 self.decoder, 

923 HEADER_SCHEMA, 

924 self._named_schemas, 

925 None, 

926 self.options, 

927 ) 

928 except EOFError: 

929 raise ValueError("cannot read header - is it an avro file?") 

930 

931 # `meta` values are bytes. So, the actual decoding has to be external. 

932 self.metadata = {k: v.decode() for k, v in self._header["meta"].items()} 

933 

934 self._schema = json.loads(self.metadata["avro.schema"]) 

935 self.codec = self.metadata.get("avro.codec", "null") 

936 

937 # Older avro files created before we were more strict about 

938 # defaults might have been writen with a bad default. Since we re-parse 

939 # the writer schema here, it will now fail. Therefore, if a user 

940 # provides a reader schema that passes parsing, we will ignore those 

941 # default errors 

942 if self.reader_schema is not None: 

943 ignore_default_error = True 

944 else: 

945 ignore_default_error = False 

946 

947 # Always parse the writer schema since it might have named types that 

948 # need to be stored in self._named_types 

949 self.writer_schema = parse_schema( 

950 self._schema, 

951 self._named_schemas["writer"], 

952 _write_hint=False, 

953 _force=True, 

954 _ignore_default_error=ignore_default_error, 

955 ) 

956 

957 @property 

958 def schema(self): 

959 import warnings 

960 

961 warnings.warn( 

962 "The 'schema' attribute is deprecated. Please use 'writer_schema'", 

963 DeprecationWarning, 

964 ) 

965 return self._schema 

966 

967 def __iter__(self) -> Iterator[T]: 

968 if not self._elems: 

969 raise NotImplementedError 

970 return self._elems 

971 

972 def __next__(self) -> T: 

973 return next(self._elems) 

974 

975 

976class reader(file_reader[AvroMessage]): 

977 """Iterator over records in an avro file. 

978 

979 Parameters 

980 ---------- 

981 fo 

982 File-like object to read from 

983 reader_schema 

984 Reader schema 

985 return_record_name 

986 If true, when reading a union of records, the result will be a tuple 

987 where the first value is the name of the record and the second value is 

988 the record itself 

989 return_record_name_override 

990 If true, this will modify the behavior of return_record_name so that 

991 the record name is only returned for unions where there is more than 

992 one record. For unions that only have one record, this option will make 

993 it so that the record is returned by itself, not a tuple with the name. 

994 return_named_type 

995 If true, when reading a union of named types, the result will be a tuple 

996 where the first value is the name of the type and the second value is 

997 the record itself 

998 NOTE: Using this option will ignore return_record_name and 

999 return_record_name_override 

1000 return_named_type_override 

1001 If true, this will modify the behavior of return_named_type so that 

1002 the named type is only returned for unions where there is more than 

1003 one named type. For unions that only have one named type, this option 

1004 will make it so that the named type is returned by itself, not a tuple 

1005 with the name 

1006 handle_unicode_errors 

1007 Default `strict`. Should be set to a valid string that can be used in 

1008 the errors argument of the string decode() function. Examples include 

1009 `replace` and `ignore` 

1010 

1011 

1012 Example:: 

1013 

1014 from fastavro import reader 

1015 with open('some-file.avro', 'rb') as fo: 

1016 avro_reader = reader(fo) 

1017 for record in avro_reader: 

1018 process_record(record) 

1019 

1020 The `fo` argument is a file-like object so another common example usage 

1021 would use an `io.BytesIO` object like so:: 

1022 

1023 from io import BytesIO 

1024 from fastavro import writer, reader 

1025 

1026 fo = BytesIO() 

1027 writer(fo, schema, records) 

1028 fo.seek(0) 

1029 for record in reader(fo): 

1030 process_record(record) 

1031 

1032 .. attribute:: metadata 

1033 

1034 Key-value pairs in the header metadata 

1035 

1036 .. attribute:: codec 

1037 

1038 The codec used when writing 

1039 

1040 .. attribute:: writer_schema 

1041 

1042 The schema used when writing 

1043 

1044 .. attribute:: reader_schema 

1045 

1046 The schema used when reading (if provided) 

1047 """ 

1048 

1049 def __init__( 

1050 self, 

1051 fo: Union[IO, AvroJSONDecoder], 

1052 reader_schema: Optional[Schema] = None, 

1053 return_record_name: bool = False, 

1054 return_record_name_override: bool = False, 

1055 handle_unicode_errors: str = "strict", 

1056 return_named_type: bool = False, 

1057 return_named_type_override: bool = False, 

1058 ): 

1059 options = { 

1060 "return_record_name": return_record_name, 

1061 "return_record_name_override": return_record_name_override, 

1062 "handle_unicode_errors": handle_unicode_errors, 

1063 "return_named_type": return_named_type, 

1064 "return_named_type_override": return_named_type_override, 

1065 } 

1066 super().__init__(fo, reader_schema, options) 

1067 

1068 if isinstance(self.decoder, AvroJSONDecoder): 

1069 self.decoder.configure(self.reader_schema, self._named_schemas["reader"]) 

1070 

1071 self.writer_schema = self.reader_schema 

1072 self.reader_schema = None 

1073 self._named_schemas["writer"] = self._named_schemas["reader"] 

1074 self._named_schemas["reader"] = {} 

1075 

1076 def _elems(): 

1077 while not self.decoder.done: 

1078 yield read_data( 

1079 self.decoder, 

1080 self.writer_schema, 

1081 self._named_schemas, 

1082 self.reader_schema, 

1083 self.options, 

1084 ) 

1085 self.decoder.drain() 

1086 

1087 self._elems = _elems() 

1088 

1089 else: 

1090 self._read_header() 

1091 

1092 self._elems = _iter_avro_records( 

1093 self.decoder, 

1094 self._header, 

1095 self.codec, 

1096 self.writer_schema, 

1097 self._named_schemas, 

1098 self.reader_schema, 

1099 self.options, 

1100 ) 

1101 

1102 

1103class block_reader(file_reader[Block]): 

1104 """Iterator over :class:`.Block` in an avro file. 

1105 

1106 Parameters 

1107 ---------- 

1108 fo 

1109 Input stream 

1110 reader_schema 

1111 Reader schema 

1112 return_record_name 

1113 If true, when reading a union of records, the result will be a tuple 

1114 where the first value is the name of the record and the second value is 

1115 the record itself 

1116 return_record_name_override 

1117 If true, this will modify the behavior of return_record_name so that 

1118 the record name is only returned for unions where there is more than 

1119 one record. For unions that only have one record, this option will make 

1120 it so that the record is returned by itself, not a tuple with the name. 

1121 return_named_type 

1122 If true, when reading a union of named types, the result will be a tuple 

1123 where the first value is the name of the type and the second value is 

1124 the record itself 

1125 NOTE: Using this option will ignore return_record_name and 

1126 return_record_name_override 

1127 return_named_type_override 

1128 If true, this will modify the behavior of return_named_type so that 

1129 the named type is only returned for unions where there is more than 

1130 one named type. For unions that only have one named type, this option 

1131 will make it so that the named type is returned by itself, not a tuple 

1132 with the name 

1133 handle_unicode_errors 

1134 Default `strict`. Should be set to a valid string that can be used in 

1135 the errors argument of the string decode() function. Examples include 

1136 `replace` and `ignore` 

1137 

1138 

1139 Example:: 

1140 

1141 from fastavro import block_reader 

1142 with open('some-file.avro', 'rb') as fo: 

1143 avro_reader = block_reader(fo) 

1144 for block in avro_reader: 

1145 process_block(block) 

1146 

1147 .. attribute:: metadata 

1148 

1149 Key-value pairs in the header metadata 

1150 

1151 .. attribute:: codec 

1152 

1153 The codec used when writing 

1154 

1155 .. attribute:: writer_schema 

1156 

1157 The schema used when writing 

1158 

1159 .. attribute:: reader_schema 

1160 

1161 The schema used when reading (if provided) 

1162 """ 

1163 

1164 def __init__( 

1165 self, 

1166 fo: IO, 

1167 reader_schema: Optional[Schema] = None, 

1168 return_record_name: bool = False, 

1169 return_record_name_override: bool = False, 

1170 handle_unicode_errors: str = "strict", 

1171 return_named_type: bool = False, 

1172 return_named_type_override: bool = False, 

1173 ): 

1174 options = { 

1175 "return_record_name": return_record_name, 

1176 "return_record_name_override": return_record_name_override, 

1177 "handle_unicode_errors": handle_unicode_errors, 

1178 "return_named_type": return_named_type, 

1179 "return_named_type_override": return_named_type_override, 

1180 } 

1181 super().__init__(fo, reader_schema, options) 

1182 

1183 self._read_header() 

1184 

1185 self._elems = _iter_avro_blocks( 

1186 self.decoder, 

1187 self._header, 

1188 self.codec, 

1189 self.writer_schema, 

1190 self._named_schemas, 

1191 self.reader_schema, 

1192 self.options, 

1193 ) 

1194 

1195 

1196def schemaless_reader( 

1197 fo: IO, 

1198 writer_schema: Schema, 

1199 reader_schema: Optional[Schema] = None, 

1200 return_record_name: bool = False, 

1201 return_record_name_override: bool = False, 

1202 handle_unicode_errors: str = "strict", 

1203 return_named_type: bool = False, 

1204 return_named_type_override: bool = False, 

1205) -> AvroMessage: 

1206 """Reads a single record written using the 

1207 :meth:`~fastavro._write_py.schemaless_writer` 

1208 

1209 Parameters 

1210 ---------- 

1211 fo 

1212 Input stream 

1213 writer_schema 

1214 Schema used when calling schemaless_writer 

1215 reader_schema 

1216 If the schema has changed since being written then the new schema can 

1217 be given to allow for schema migration 

1218 return_record_name 

1219 If true, when reading a union of records, the result will be a tuple 

1220 where the first value is the name of the record and the second value is 

1221 the record itself 

1222 return_record_name_override 

1223 If true, this will modify the behavior of return_record_name so that 

1224 the record name is only returned for unions where there is more than 

1225 one record. For unions that only have one record, this option will make 

1226 it so that the record is returned by itself, not a tuple with the name. 

1227 return_named_type 

1228 If true, when reading a union of named types, the result will be a tuple 

1229 where the first value is the name of the type and the second value is 

1230 the record itself 

1231 NOTE: Using this option will ignore return_record_name and 

1232 return_record_name_override 

1233 return_named_type_override 

1234 If true, this will modify the behavior of return_named_type so that 

1235 the named type is only returned for unions where there is more than 

1236 one named type. For unions that only have one named type, this option 

1237 will make it so that the named type is returned by itself, not a tuple 

1238 with the name 

1239 handle_unicode_errors 

1240 Default `strict`. Should be set to a valid string that can be used in 

1241 the errors argument of the string decode() function. Examples include 

1242 `replace` and `ignore` 

1243 

1244 

1245 Example:: 

1246 

1247 parsed_schema = fastavro.parse_schema(schema) 

1248 with open('file', 'rb') as fp: 

1249 record = fastavro.schemaless_reader(fp, parsed_schema) 

1250 

1251 Note: The ``schemaless_reader`` can only read a single record. 

1252 """ 

1253 if writer_schema == reader_schema: 

1254 # No need for the reader schema if they are the same 

1255 reader_schema = None 

1256 

1257 named_schemas: Dict[str, NamedSchemas] = _default_named_schemas() 

1258 writer_schema = parse_schema(writer_schema, named_schemas["writer"]) 

1259 

1260 if reader_schema: 

1261 reader_schema = parse_schema(reader_schema, named_schemas["reader"]) 

1262 

1263 decoder = BinaryDecoder(fo) 

1264 

1265 options = { 

1266 "return_record_name": return_record_name, 

1267 "return_record_name_override": return_record_name_override, 

1268 "handle_unicode_errors": handle_unicode_errors, 

1269 "return_named_type": return_named_type, 

1270 "return_named_type_override": return_named_type_override, 

1271 } 

1272 

1273 return read_data( 

1274 decoder, 

1275 writer_schema, 

1276 named_schemas, 

1277 reader_schema, 

1278 options, 

1279 ) 

1280 

1281 

1282def is_avro(path_or_buffer: Union[str, IO]) -> bool: 

1283 """Return True if path (or buffer) points to an Avro file. This will only 

1284 work for avro files that contain the normal avro schema header like those 

1285 create from :func:`~fastavro._write_py.writer`. This function is not intended 

1286 to be used with binary data created from 

1287 :func:`~fastavro._write_py.schemaless_writer` since that does not include the 

1288 avro header. 

1289 

1290 Parameters 

1291 ---------- 

1292 path_or_buffer 

1293 Path to file 

1294 """ 

1295 fp: IO 

1296 if isinstance(path_or_buffer, str): 

1297 fp = open(path_or_buffer, "rb") 

1298 close = True 

1299 else: 

1300 fp = path_or_buffer 

1301 close = False 

1302 

1303 try: 

1304 header = fp.read(len(MAGIC)) 

1305 return header == MAGIC 

1306 finally: 

1307 if close: 

1308 fp.close()