Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fastavro/_read_py.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

454 statements  

1"""Python code for reading AVRO files""" 

2 

3# This code is a modified version of the code at 

4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under 

5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0) 

6 

7import bz2 

8import json 

9import lzma 

10import zlib 

11from datetime import datetime, timezone 

12from decimal import Context 

13from io import BytesIO 

14from struct import error as StructError 

15from typing import IO, Union, Optional, Generic, TypeVar, Iterator, Dict 

16from warnings import warn 

17 

18from .io.binary_decoder import BinaryDecoder 

19from .io.json_decoder import AvroJSONDecoder 

20from .logical_readers import LOGICAL_READERS 

21from .schema import ( 

22 extract_record_type, 

23 is_single_record_union, 

24 is_single_name_union, 

25 extract_logical_type, 

26 parse_schema, 

27) 

28from .types import Schema, AvroMessage, NamedSchemas 

29from ._read_common import ( 

30 SchemaResolutionError, 

31 MAGIC, 

32 SYNC_SIZE, 

33 HEADER_SCHEMA, 

34 missing_codec_lib, 

35) 

36from .const import NAMED_TYPES, AVRO_TYPES 

37 

38T = TypeVar("T") 

39 

40decimal_context = Context() 

41epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) 

42epoch_naive = datetime(1970, 1, 1) 

43 

44 

45def _default_named_schemas() -> Dict[str, NamedSchemas]: 

46 return {"writer": {}, "reader": {}} 

47 

48 

49def match_types(writer_type, reader_type, named_schemas): 

50 if isinstance(writer_type, list) or isinstance(reader_type, list): 

51 return True 

52 if isinstance(writer_type, dict) or isinstance(reader_type, dict): 

53 matching_schema = match_schemas( 

54 writer_type, reader_type, named_schemas, raise_on_error=False 

55 ) 

56 return matching_schema is not None 

57 if writer_type == reader_type: 

58 return True 

59 # promotion cases 

60 elif writer_type == "int" and reader_type in ["long", "float", "double"]: 

61 return True 

62 elif writer_type == "long" and reader_type in ["float", "double"]: 

63 return True 

64 elif writer_type == "float" and reader_type == "double": 

65 return True 

66 elif writer_type == "string" and reader_type == "bytes": 

67 return True 

68 elif writer_type == "bytes" and reader_type == "string": 

69 return True 

70 writer_schema = named_schemas["writer"].get(writer_type) 

71 reader_schema = named_schemas["reader"].get(reader_type) 

72 if writer_schema is not None and reader_schema is not None: 

73 return match_types(writer_schema, reader_schema, named_schemas) 

74 return False 

75 

76 

77def match_schemas(w_schema, r_schema, named_schemas, raise_on_error=True): 

78 if isinstance(w_schema, list): 

79 # If the writer is a union, checks will happen in read_union after the 

80 # correct schema is known 

81 return r_schema 

82 elif isinstance(r_schema, list): 

83 # If the reader is a union, ensure one of the new schemas is the same 

84 # as the writer 

85 for schema in r_schema: 

86 if match_types(w_schema, schema, named_schemas): 

87 return schema 

88 else: 

89 if raise_on_error: 

90 raise SchemaResolutionError( 

91 f"Schema mismatch: {w_schema} is not {r_schema}" 

92 ) 

93 else: 

94 return None 

95 else: 

96 # Check for dicts as primitive types are just strings 

97 if isinstance(w_schema, dict): 

98 w_type = w_schema["type"] 

99 else: 

100 w_type = w_schema 

101 if isinstance(r_schema, dict): 

102 r_type = r_schema["type"] 

103 else: 

104 r_type = r_schema 

105 

106 if w_type == r_type == "map": 

107 if match_types(w_schema["values"], r_schema["values"], named_schemas): 

108 return r_schema 

109 elif w_type == r_type == "array": 

110 if match_types(w_schema["items"], r_schema["items"], named_schemas): 

111 return r_schema 

112 elif w_type in NAMED_TYPES and r_type in NAMED_TYPES: 

113 if w_type == r_type == "fixed" and w_schema["size"] != r_schema["size"]: 

114 if raise_on_error: 

115 raise SchemaResolutionError( 

116 f"Schema mismatch: {w_schema} size is different than {r_schema} size" 

117 ) 

118 else: 

119 return None 

120 

121 w_unqual_name = w_schema["name"].split(".")[-1] 

122 r_unqual_name = r_schema["name"].split(".")[-1] 

123 r_aliases = r_schema.get("aliases", []) 

124 if ( 

125 w_unqual_name == r_unqual_name 

126 or w_schema["name"] in r_aliases 

127 or w_unqual_name in r_aliases 

128 ): 

129 return r_schema 

130 elif w_type not in AVRO_TYPES and r_type in NAMED_TYPES: 

131 if match_types(w_type, r_schema["name"], named_schemas): 

132 return r_schema["name"] 

133 elif match_types(w_type, r_type, named_schemas): 

134 return r_schema 

135 if raise_on_error: 

136 raise SchemaResolutionError( 

137 f"Schema mismatch: {w_schema} is not {r_schema}" 

138 ) 

139 else: 

140 return None 

141 

142 

143def read_null( 

144 decoder, 

145 writer_schema=None, 

146 named_schemas=None, 

147 reader_schema=None, 

148 options={}, 

149): 

150 return decoder.read_null() 

151 

152 

153def skip_null(decoder, writer_schema=None, named_schemas=None): 

154 decoder.read_null() 

155 

156 

157def read_boolean( 

158 decoder, 

159 writer_schema=None, 

160 named_schemas=None, 

161 reader_schema=None, 

162 options={}, 

163): 

164 return decoder.read_boolean() 

165 

166 

167def skip_boolean(decoder, writer_schema=None, named_schemas=None): 

168 decoder.read_boolean() 

169 

170 

171def read_int( 

172 decoder, 

173 writer_schema=None, 

174 named_schemas=None, 

175 reader_schema=None, 

176 options={}, 

177): 

178 return decoder.read_int() 

179 

180 

181def skip_int(decoder, writer_schema=None, named_schemas=None): 

182 decoder.read_int() 

183 

184 

185def read_long( 

186 decoder, 

187 writer_schema=None, 

188 named_schemas=None, 

189 reader_schema=None, 

190 options={}, 

191): 

192 return decoder.read_long() 

193 

194 

195def skip_long(decoder, writer_schema=None, named_schemas=None): 

196 decoder.read_long() 

197 

198 

199def read_float( 

200 decoder, 

201 writer_schema=None, 

202 named_schemas=None, 

203 reader_schema=None, 

204 options={}, 

205): 

206 return decoder.read_float() 

207 

208 

209def skip_float(decoder, writer_schema=None, named_schemas=None): 

210 decoder.read_float() 

211 

212 

213def read_double( 

214 decoder, 

215 writer_schema=None, 

216 named_schemas=None, 

217 reader_schema=None, 

218 options={}, 

219): 

220 return decoder.read_double() 

221 

222 

223def skip_double(decoder, writer_schema=None, named_schemas=None): 

224 decoder.read_double() 

225 

226 

227def read_bytes( 

228 decoder, 

229 writer_schema=None, 

230 named_schemas=None, 

231 reader_schema=None, 

232 options={}, 

233): 

234 return decoder.read_bytes() 

235 

236 

237def skip_bytes(decoder, writer_schema=None, named_schemas=None): 

238 decoder.read_bytes() 

239 

240 

241def read_utf8( 

242 decoder, 

243 writer_schema=None, 

244 named_schemas=None, 

245 reader_schema=None, 

246 options={}, 

247): 

248 return decoder.read_utf8( 

249 handle_unicode_errors=options.get("handle_unicode_errors", "strict") 

250 ) 

251 

252 

253def skip_utf8(decoder, writer_schema=None, named_schemas=None): 

254 decoder.read_utf8() 

255 

256 

257def read_fixed( 

258 decoder, 

259 writer_schema, 

260 named_schemas=None, 

261 reader_schema=None, 

262 options={}, 

263): 

264 size = writer_schema["size"] 

265 return decoder.read_fixed(size) 

266 

267 

268def skip_fixed(decoder, writer_schema, named_schemas=None): 

269 size = writer_schema["size"] 

270 decoder.read_fixed(size) 

271 

272 

273def read_enum( 

274 decoder, 

275 writer_schema, 

276 named_schemas, 

277 reader_schema=None, 

278 options={}, 

279): 

280 symbol = writer_schema["symbols"][decoder.read_enum()] 

281 if reader_schema and symbol not in reader_schema["symbols"]: 

282 default = reader_schema.get("default") 

283 if default: 

284 return default 

285 else: 

286 symlist = reader_schema["symbols"] 

287 msg = f"{symbol} not found in reader symbol list {reader_schema['name']}, known symbols: {symlist}" 

288 raise SchemaResolutionError(msg) 

289 return symbol 

290 

291 

292def skip_enum(decoder, writer_schema, named_schemas): 

293 decoder.read_enum() 

294 

295 

296def read_array( 

297 decoder, 

298 writer_schema, 

299 named_schemas, 

300 reader_schema=None, 

301 options={}, 

302): 

303 if reader_schema: 

304 

305 def item_reader(decoder, w_schema, r_schema, options): 

306 return read_data( 

307 decoder, 

308 w_schema["items"], 

309 named_schemas, 

310 r_schema["items"], 

311 options, 

312 ) 

313 

314 else: 

315 

316 def item_reader(decoder, w_schema, r_schema, options): 

317 return read_data( 

318 decoder, 

319 w_schema["items"], 

320 named_schemas, 

321 None, 

322 options, 

323 ) 

324 

325 read_items = [] 

326 

327 decoder.read_array_start() 

328 

329 for item in decoder.iter_array(): 

330 read_items.append( 

331 item_reader( 

332 decoder, 

333 writer_schema, 

334 reader_schema, 

335 options, 

336 ) 

337 ) 

338 

339 decoder.read_array_end() 

340 

341 return read_items 

342 

343 

344def skip_array(decoder, writer_schema, named_schemas): 

345 decoder.read_array_start() 

346 

347 for item in decoder.iter_array(): 

348 skip_data(decoder, writer_schema["items"], named_schemas) 

349 

350 decoder.read_array_end() 

351 

352 

353def read_map( 

354 decoder, 

355 writer_schema, 

356 named_schemas, 

357 reader_schema=None, 

358 options={}, 

359): 

360 if reader_schema: 

361 

362 def item_reader(decoder, w_schema, r_schema): 

363 return read_data( 

364 decoder, 

365 w_schema["values"], 

366 named_schemas, 

367 r_schema["values"], 

368 options, 

369 ) 

370 

371 else: 

372 

373 def item_reader(decoder, w_schema, r_schema): 

374 return read_data( 

375 decoder, 

376 w_schema["values"], 

377 named_schemas, 

378 None, 

379 options, 

380 ) 

381 

382 read_items = {} 

383 

384 decoder.read_map_start() 

385 

386 for item in decoder.iter_map(): 

387 key = decoder.read_utf8() 

388 read_items[key] = item_reader(decoder, writer_schema, reader_schema) 

389 

390 decoder.read_map_end() 

391 

392 return read_items 

393 

394 

395def skip_map(decoder, writer_schema, named_schemas): 

396 decoder.read_map_start() 

397 

398 for item in decoder.iter_map(): 

399 decoder.read_utf8() 

400 skip_data(decoder, writer_schema["values"], named_schemas) 

401 

402 decoder.read_map_end() 

403 

404 

405def read_union( 

406 decoder, 

407 writer_schema, 

408 named_schemas, 

409 reader_schema=None, 

410 options={}, 

411): 

412 # schema resolution 

413 index = decoder.read_index() 

414 idx_schema = writer_schema[index] 

415 idx_reader_schema = None 

416 

417 if reader_schema: 

418 # Handle case where the reader schema is just a single type (not union) 

419 if not isinstance(reader_schema, list): 

420 if match_types(idx_schema, reader_schema, named_schemas): 

421 result = read_data( 

422 decoder, 

423 idx_schema, 

424 named_schemas, 

425 reader_schema, 

426 options, 

427 ) 

428 else: 

429 raise SchemaResolutionError( 

430 f"schema mismatch: {writer_schema} not found in {reader_schema}" 

431 ) 

432 else: 

433 for schema in reader_schema: 

434 if match_types(idx_schema, schema, named_schemas): 

435 idx_reader_schema = schema 

436 result = read_data( 

437 decoder, 

438 idx_schema, 

439 named_schemas, 

440 schema, 

441 options, 

442 ) 

443 break 

444 else: 

445 raise SchemaResolutionError( 

446 f"schema mismatch: {writer_schema} not found in {reader_schema}" 

447 ) 

448 else: 

449 result = read_data(decoder, idx_schema, named_schemas, None, options) 

450 

451 return_record_name_override = options.get("return_record_name_override") 

452 return_record_name = options.get("return_record_name") 

453 return_named_type_override = options.get("return_named_type_override") 

454 return_named_type = options.get("return_named_type") 

455 if return_named_type_override and is_single_name_union(writer_schema): 

456 return result 

457 elif return_named_type and extract_record_type(idx_schema) in NAMED_TYPES: 

458 schema_name = ( 

459 idx_reader_schema["name"] if idx_reader_schema else idx_schema["name"] 

460 ) 

461 return (schema_name, result) 

462 elif return_named_type and extract_record_type(idx_schema) not in AVRO_TYPES: 

463 # idx_schema is a named type 

464 schema_name = ( 

465 named_schemas["reader"][idx_reader_schema]["name"] 

466 if idx_reader_schema 

467 else named_schemas["writer"][idx_schema]["name"] 

468 ) 

469 return (schema_name, result) 

470 elif return_record_name_override and is_single_record_union(writer_schema): 

471 return result 

472 elif return_record_name and extract_record_type(idx_schema) == "record": 

473 schema_name = ( 

474 idx_reader_schema["name"] if idx_reader_schema else idx_schema["name"] 

475 ) 

476 return (schema_name, result) 

477 elif return_record_name and extract_record_type(idx_schema) not in AVRO_TYPES: 

478 # idx_schema is a named type 

479 schema_name = ( 

480 named_schemas["reader"][idx_reader_schema]["name"] 

481 if idx_reader_schema 

482 else named_schemas["writer"][idx_schema]["name"] 

483 ) 

484 return (schema_name, result) 

485 else: 

486 return result 

487 

488 

489def skip_union(decoder, writer_schema, named_schemas): 

490 # schema resolution 

491 index = decoder.read_index() 

492 skip_data(decoder, writer_schema[index], named_schemas) 

493 

494 

495def read_record( 

496 decoder, 

497 writer_schema, 

498 named_schemas, 

499 reader_schema=None, 

500 options={}, 

501): 

502 """A record is encoded by encoding the values of its fields in the order 

503 that they are declared. In other words, a record is encoded as just the 

504 concatenation of the encodings of its fields. Field values are encoded per 

505 their schema. 

506 

507 Schema Resolution: 

508 * the ordering of fields may be different: fields are matched by name. 

509 * schemas for fields with the same name in both records are resolved 

510 recursively. 

511 * if the writer's record contains a field with a name not present in the 

512 reader's record, the writer's value for that field is ignored. 

513 * if the reader's record schema has a field that contains a default value, 

514 and writer's schema does not have a field with the same name, then the 

515 reader should use the default value from its field. 

516 * if the reader's record schema has a field with no default value, and 

517 writer's schema does not have a field with the same name, then the 

518 field's value is unset. 

519 """ 

520 record = {} 

521 if reader_schema is None: 

522 for field in writer_schema["fields"]: 

523 record[field["name"]] = read_data( 

524 decoder, 

525 field["type"], 

526 named_schemas, 

527 None, 

528 options, 

529 ) 

530 else: 

531 readers_field_dict = {} 

532 aliases_field_dict = {} 

533 for f in reader_schema["fields"]: 

534 readers_field_dict[f["name"]] = f 

535 for alias in f.get("aliases", []): 

536 aliases_field_dict[alias] = f 

537 

538 for field in writer_schema["fields"]: 

539 readers_field = readers_field_dict.get( 

540 field["name"], 

541 aliases_field_dict.get(field["name"]), 

542 ) 

543 if readers_field: 

544 readers_field_name = readers_field["name"] 

545 record[readers_field_name] = read_data( 

546 decoder, 

547 field["type"], 

548 named_schemas, 

549 readers_field["type"], 

550 options, 

551 ) 

552 del readers_field_dict[readers_field_name] 

553 else: 

554 skip_data(decoder, field["type"], named_schemas) 

555 

556 # fill in default values 

557 for f_name, field in readers_field_dict.items(): 

558 if "default" in field: 

559 record[field["name"]] = field["default"] 

560 else: 

561 msg = f"No default value for field {field['name']} in {reader_schema['name']}" 

562 raise SchemaResolutionError(msg) 

563 

564 return record 

565 

566 

567def skip_record(decoder, writer_schema, named_schemas): 

568 for field in writer_schema["fields"]: 

569 skip_data(decoder, field["type"], named_schemas) 

570 

571 

572READERS = { 

573 "null": read_null, 

574 "boolean": read_boolean, 

575 "string": read_utf8, 

576 "int": read_int, 

577 "long": read_long, 

578 "float": read_float, 

579 "double": read_double, 

580 "bytes": read_bytes, 

581 "fixed": read_fixed, 

582 "enum": read_enum, 

583 "array": read_array, 

584 "map": read_map, 

585 "union": read_union, 

586 "error_union": read_union, 

587 "record": read_record, 

588 "error": read_record, 

589 "request": read_record, 

590} 

591 

592SKIPS = { 

593 "null": skip_null, 

594 "boolean": skip_boolean, 

595 "string": skip_utf8, 

596 "int": skip_int, 

597 "long": skip_long, 

598 "float": skip_float, 

599 "double": skip_double, 

600 "bytes": skip_bytes, 

601 "fixed": skip_fixed, 

602 "enum": skip_enum, 

603 "array": skip_array, 

604 "map": skip_map, 

605 "union": skip_union, 

606 "error_union": skip_union, 

607 "record": skip_record, 

608 "error": skip_record, 

609 "request": skip_record, 

610} 

611 

612 

613def maybe_promote(data, writer_type, reader_type): 

614 if writer_type == "int": 

615 # No need to promote to long since they are the same type in Python 

616 if reader_type == "float" or reader_type == "double": 

617 return float(data) 

618 if writer_type == "long": 

619 if reader_type == "float" or reader_type == "double": 

620 return float(data) 

621 if writer_type == "string" and reader_type == "bytes": 

622 return data.encode() 

623 if writer_type == "bytes" and reader_type == "string": 

624 return data.decode() 

625 return data 

626 

627 

628def read_data( 

629 decoder, 

630 writer_schema, 

631 named_schemas, 

632 reader_schema=None, 

633 options={}, 

634): 

635 """Read data from file object according to schema.""" 

636 

637 record_type = extract_record_type(writer_schema) 

638 

639 if reader_schema: 

640 reader_schema = match_schemas( 

641 writer_schema, 

642 reader_schema, 

643 named_schemas, 

644 ) 

645 

646 reader_fn = READERS.get(record_type) 

647 if reader_fn: 

648 try: 

649 data = reader_fn( 

650 decoder, 

651 writer_schema, 

652 named_schemas, 

653 reader_schema, 

654 options, 

655 ) 

656 except StructError: 

657 raise EOFError(f"cannot read {record_type} from {decoder.fo}") 

658 

659 if "logicalType" in writer_schema: 

660 logical_type = extract_logical_type(writer_schema) 

661 fn = LOGICAL_READERS.get(logical_type) 

662 if fn: 

663 return fn(data, writer_schema, reader_schema) 

664 

665 if reader_schema is not None: 

666 return maybe_promote(data, record_type, extract_record_type(reader_schema)) 

667 else: 

668 return data 

669 else: 

670 return read_data( 

671 decoder, 

672 named_schemas["writer"][record_type], 

673 named_schemas, 

674 named_schemas["reader"].get(reader_schema), 

675 options, 

676 ) 

677 

678 

679def skip_data(decoder, writer_schema, named_schemas): 

680 record_type = extract_record_type(writer_schema) 

681 

682 reader_fn = SKIPS.get(record_type) 

683 if reader_fn: 

684 reader_fn(decoder, writer_schema, named_schemas) 

685 else: 

686 skip_data(decoder, named_schemas["writer"][record_type], named_schemas) 

687 

688 

689def skip_sync(fo, sync_marker): 

690 """Skip an expected sync marker, complaining if it doesn't match""" 

691 if fo.read(SYNC_SIZE) != sync_marker: 

692 raise ValueError("expected sync marker not found") 

693 

694 

695def null_read_block(decoder): 

696 """Read block in "null" codec.""" 

697 return BytesIO(decoder.read_bytes()) 

698 

699 

700def deflate_read_block(decoder): 

701 """Read block in "deflate" codec.""" 

702 data = decoder.read_bytes() 

703 # -15 is the log of the window size; negative indicates "raw" (no 

704 # zlib headers) decompression. See zlib.h. 

705 return BytesIO(zlib.decompressobj(-15).decompress(data)) 

706 

707 

708def bzip2_read_block(decoder): 

709 """Read block in "bzip2" codec.""" 

710 data = decoder.read_bytes() 

711 return BytesIO(bz2.decompress(data)) 

712 

713 

714def xz_read_block(decoder): 

715 length = read_long(decoder) 

716 data = decoder.read_fixed(length) 

717 return BytesIO(lzma.decompress(data)) 

718 

719 

720BLOCK_READERS = { 

721 "null": null_read_block, 

722 "deflate": deflate_read_block, 

723 "bzip2": bzip2_read_block, 

724 "xz": xz_read_block, 

725} 

726 

727 

728def snappy_read_block(decoder): 

729 length = read_long(decoder) 

730 data = decoder.read_fixed(length - 4) 

731 decoder.read_fixed(4) # CRC 

732 return BytesIO(snappy_decompress(data)) 

733 

734 

735try: 

736 from cramjam import snappy 

737 

738 snappy_decompress = snappy.decompress_raw 

739except ImportError: 

740 try: 

741 import snappy 

742 

743 snappy_decompress = snappy.decompress 

744 warn( 

745 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed", 

746 DeprecationWarning, 

747 ) 

748 except ImportError: 

749 BLOCK_READERS["snappy"] = missing_codec_lib("snappy", "cramjam") 

750 else: 

751 BLOCK_READERS["snappy"] = snappy_read_block 

752else: 

753 BLOCK_READERS["snappy"] = snappy_read_block 

754 

755 

756def zstandard_read_block(decoder): 

757 length = read_long(decoder) 

758 data = decoder.read_fixed(length) 

759 return BytesIO(zstandard.ZstdDecompressor().decompressobj().decompress(data)) 

760 

761 

762try: 

763 import zstandard 

764except ImportError: 

765 BLOCK_READERS["zstandard"] = missing_codec_lib("zstandard", "zstandard") 

766else: 

767 BLOCK_READERS["zstandard"] = zstandard_read_block 

768 

769 

770def lz4_read_block(decoder): 

771 length = read_long(decoder) 

772 data = decoder.read_fixed(length) 

773 return BytesIO(lz4.block.decompress(data)) 

774 

775 

776try: 

777 import lz4.block 

778except ImportError: 

779 BLOCK_READERS["lz4"] = missing_codec_lib("lz4", "lz4") 

780else: 

781 BLOCK_READERS["lz4"] = lz4_read_block 

782 

783 

784def _iter_avro_records( 

785 decoder, 

786 header, 

787 codec, 

788 writer_schema, 

789 named_schemas, 

790 reader_schema, 

791 options, 

792): 

793 """Return iterator over avro records.""" 

794 sync_marker = header["sync"] 

795 

796 read_block = BLOCK_READERS.get(codec) 

797 if not read_block: 

798 raise ValueError(f"Unrecognized codec: {codec}") 

799 

800 block_count = 0 

801 while True: 

802 try: 

803 block_count = decoder.read_long() 

804 except EOFError: 

805 return 

806 

807 block_fo = read_block(decoder) 

808 

809 for i in range(block_count): 

810 yield read_data( 

811 BinaryDecoder(block_fo), 

812 writer_schema, 

813 named_schemas, 

814 reader_schema, 

815 options, 

816 ) 

817 

818 skip_sync(decoder.fo, sync_marker) 

819 

820 

821def _iter_avro_blocks( 

822 decoder, 

823 header, 

824 codec, 

825 writer_schema, 

826 named_schemas, 

827 reader_schema, 

828 options, 

829): 

830 """Return iterator over avro blocks.""" 

831 sync_marker = header["sync"] 

832 

833 read_block = BLOCK_READERS.get(codec) 

834 if not read_block: 

835 raise ValueError(f"Unrecognized codec: {codec}") 

836 

837 while True: 

838 offset = decoder.fo.tell() 

839 try: 

840 num_block_records = decoder.read_long() 

841 except EOFError: 

842 return 

843 

844 block_bytes = read_block(decoder) 

845 

846 skip_sync(decoder.fo, sync_marker) 

847 

848 size = decoder.fo.tell() - offset 

849 

850 yield Block( 

851 block_bytes, 

852 num_block_records, 

853 codec, 

854 reader_schema, 

855 writer_schema, 

856 named_schemas, 

857 offset, 

858 size, 

859 options, 

860 ) 

861 

862 

863class Block: 

864 """An avro block. Will yield records when iterated over 

865 

866 .. attribute:: num_records 

867 

868 Number of records in the block 

869 

870 .. attribute:: writer_schema 

871 

872 The schema used when writing 

873 

874 .. attribute:: reader_schema 

875 

876 The schema used when reading (if provided) 

877 

878 .. attribute:: offset 

879 

880 Offset of the block from the beginning of the avro file 

881 

882 .. attribute:: size 

883 

884 Size of the block in bytes 

885 """ 

886 

887 def __init__( 

888 self, 

889 bytes_, 

890 num_records, 

891 codec, 

892 reader_schema, 

893 writer_schema, 

894 named_schemas, 

895 offset, 

896 size, 

897 options, 

898 ): 

899 self.bytes_ = bytes_ 

900 self.num_records = num_records 

901 self.codec = codec 

902 self.reader_schema = reader_schema 

903 self.writer_schema = writer_schema 

904 self._named_schemas = named_schemas 

905 self.offset = offset 

906 self.size = size 

907 self.options = options 

908 

909 def __iter__(self): 

910 for i in range(self.num_records): 

911 yield read_data( 

912 BinaryDecoder(self.bytes_), 

913 self.writer_schema, 

914 self._named_schemas, 

915 self.reader_schema, 

916 self.options, 

917 ) 

918 

919 def __str__(self): 

920 return ( 

921 f"Avro block: {len(self.bytes_)} bytes, " 

922 + f"{self.num_records} records, " 

923 + f"codec: {self.codec}, position {self.offset}+{self.size}" 

924 ) 

925 

926 

927class file_reader(Generic[T]): 

928 def __init__( 

929 self, 

930 fo_or_decoder, 

931 reader_schema=None, 

932 options={}, 

933 ): 

934 if isinstance(fo_or_decoder, AvroJSONDecoder): 

935 self.decoder = fo_or_decoder 

936 else: 

937 # If a decoder was not provided, assume binary 

938 self.decoder = BinaryDecoder(fo_or_decoder) 

939 

940 self._named_schemas = _default_named_schemas() 

941 if reader_schema: 

942 self.reader_schema = parse_schema( 

943 reader_schema, self._named_schemas["reader"], _write_hint=False 

944 ) 

945 

946 else: 

947 self.reader_schema = None 

948 self.options = options 

949 self._elems = None 

950 

951 def _read_header(self): 

952 try: 

953 self._header = read_data( 

954 self.decoder, 

955 HEADER_SCHEMA, 

956 self._named_schemas, 

957 None, 

958 self.options, 

959 ) 

960 except EOFError: 

961 raise ValueError("cannot read header - is it an avro file?") 

962 

963 # `meta` values are bytes. So, the actual decoding has to be external. 

964 self.metadata = {k: v.decode() for k, v in self._header["meta"].items()} 

965 

966 self._schema = json.loads(self.metadata["avro.schema"]) 

967 self.codec = self.metadata.get("avro.codec", "null") 

968 

969 # Older avro files created before we were more strict about 

970 # defaults might have been writen with a bad default. Since we re-parse 

971 # the writer schema here, it will now fail. Therefore, if a user 

972 # provides a reader schema that passes parsing, we will ignore those 

973 # default errors 

974 if self.reader_schema is not None: 

975 ignore_default_error = True 

976 else: 

977 ignore_default_error = False 

978 

979 # Always parse the writer schema since it might have named types that 

980 # need to be stored in self._named_types 

981 self.writer_schema = parse_schema( 

982 self._schema, 

983 self._named_schemas["writer"], 

984 _write_hint=False, 

985 _force=True, 

986 _ignore_default_error=ignore_default_error, 

987 ) 

988 

989 @property 

990 def schema(self): 

991 import warnings 

992 

993 warnings.warn( 

994 "The 'schema' attribute is deprecated. Please use 'writer_schema'", 

995 DeprecationWarning, 

996 ) 

997 return self._schema 

998 

999 def __iter__(self) -> Iterator[T]: 

1000 if not self._elems: 

1001 raise NotImplementedError 

1002 return self._elems 

1003 

1004 def __next__(self) -> T: 

1005 return next(self._elems) 

1006 

1007 

1008class reader(file_reader[AvroMessage]): 

1009 """Iterator over records in an avro file. 

1010 

1011 Parameters 

1012 ---------- 

1013 fo 

1014 File-like object to read from 

1015 reader_schema 

1016 Reader schema 

1017 return_record_name 

1018 If true, when reading a union of records, the result will be a tuple 

1019 where the first value is the name of the record and the second value is 

1020 the record itself 

1021 return_record_name_override 

1022 If true, this will modify the behavior of return_record_name so that 

1023 the record name is only returned for unions where there is more than 

1024 one record. For unions that only have one record, this option will make 

1025 it so that the record is returned by itself, not a tuple with the name. 

1026 return_named_type 

1027 If true, when reading a union of named types, the result will be a tuple 

1028 where the first value is the name of the type and the second value is 

1029 the record itself 

1030 NOTE: Using this option will ignore return_record_name and 

1031 return_record_name_override 

1032 return_named_type_override 

1033 If true, this will modify the behavior of return_named_type so that 

1034 the named type is only returned for unions where there is more than 

1035 one named type. For unions that only have one named type, this option 

1036 will make it so that the named type is returned by itself, not a tuple 

1037 with the name 

1038 handle_unicode_errors 

1039 Default `strict`. Should be set to a valid string that can be used in 

1040 the errors argument of the string decode() function. Examples include 

1041 `replace` and `ignore` 

1042 

1043 

1044 Example:: 

1045 

1046 from fastavro import reader 

1047 with open('some-file.avro', 'rb') as fo: 

1048 avro_reader = reader(fo) 

1049 for record in avro_reader: 

1050 process_record(record) 

1051 

1052 The `fo` argument is a file-like object so another common example usage 

1053 would use an `io.BytesIO` object like so:: 

1054 

1055 from io import BytesIO 

1056 from fastavro import writer, reader 

1057 

1058 fo = BytesIO() 

1059 writer(fo, schema, records) 

1060 fo.seek(0) 

1061 for record in reader(fo): 

1062 process_record(record) 

1063 

1064 .. attribute:: metadata 

1065 

1066 Key-value pairs in the header metadata 

1067 

1068 .. attribute:: codec 

1069 

1070 The codec used when writing 

1071 

1072 .. attribute:: writer_schema 

1073 

1074 The schema used when writing 

1075 

1076 .. attribute:: reader_schema 

1077 

1078 The schema used when reading (if provided) 

1079 """ 

1080 

1081 def __init__( 

1082 self, 

1083 fo: Union[IO, AvroJSONDecoder], 

1084 reader_schema: Optional[Schema] = None, 

1085 return_record_name: bool = False, 

1086 return_record_name_override: bool = False, 

1087 handle_unicode_errors: str = "strict", 

1088 return_named_type: bool = False, 

1089 return_named_type_override: bool = False, 

1090 ): 

1091 options = { 

1092 "return_record_name": return_record_name, 

1093 "return_record_name_override": return_record_name_override, 

1094 "handle_unicode_errors": handle_unicode_errors, 

1095 "return_named_type": return_named_type, 

1096 "return_named_type_override": return_named_type_override, 

1097 } 

1098 super().__init__(fo, reader_schema, options) 

1099 

1100 if isinstance(self.decoder, AvroJSONDecoder): 

1101 self.decoder.configure(self.reader_schema, self._named_schemas["reader"]) 

1102 

1103 self.writer_schema = self.reader_schema 

1104 self.reader_schema = None 

1105 self._named_schemas["writer"] = self._named_schemas["reader"] 

1106 self._named_schemas["reader"] = {} 

1107 

1108 def _elems(): 

1109 while not self.decoder.done: 

1110 yield read_data( 

1111 self.decoder, 

1112 self.writer_schema, 

1113 self._named_schemas, 

1114 self.reader_schema, 

1115 self.options, 

1116 ) 

1117 self.decoder.drain() 

1118 

1119 self._elems = _elems() 

1120 

1121 else: 

1122 self._read_header() 

1123 

1124 self._elems = _iter_avro_records( 

1125 self.decoder, 

1126 self._header, 

1127 self.codec, 

1128 self.writer_schema, 

1129 self._named_schemas, 

1130 self.reader_schema, 

1131 self.options, 

1132 ) 

1133 

1134 

1135class block_reader(file_reader[Block]): 

1136 """Iterator over :class:`.Block` in an avro file. 

1137 

1138 Parameters 

1139 ---------- 

1140 fo 

1141 Input stream 

1142 reader_schema 

1143 Reader schema 

1144 return_record_name 

1145 If true, when reading a union of records, the result will be a tuple 

1146 where the first value is the name of the record and the second value is 

1147 the record itself 

1148 return_record_name_override 

1149 If true, this will modify the behavior of return_record_name so that 

1150 the record name is only returned for unions where there is more than 

1151 one record. For unions that only have one record, this option will make 

1152 it so that the record is returned by itself, not a tuple with the name. 

1153 return_named_type 

1154 If true, when reading a union of named types, the result will be a tuple 

1155 where the first value is the name of the type and the second value is 

1156 the record itself 

1157 NOTE: Using this option will ignore return_record_name and 

1158 return_record_name_override 

1159 return_named_type_override 

1160 If true, this will modify the behavior of return_named_type so that 

1161 the named type is only returned for unions where there is more than 

1162 one named type. For unions that only have one named type, this option 

1163 will make it so that the named type is returned by itself, not a tuple 

1164 with the name 

1165 handle_unicode_errors 

1166 Default `strict`. Should be set to a valid string that can be used in 

1167 the errors argument of the string decode() function. Examples include 

1168 `replace` and `ignore` 

1169 

1170 

1171 Example:: 

1172 

1173 from fastavro import block_reader 

1174 with open('some-file.avro', 'rb') as fo: 

1175 avro_reader = block_reader(fo) 

1176 for block in avro_reader: 

1177 process_block(block) 

1178 

1179 .. attribute:: metadata 

1180 

1181 Key-value pairs in the header metadata 

1182 

1183 .. attribute:: codec 

1184 

1185 The codec used when writing 

1186 

1187 .. attribute:: writer_schema 

1188 

1189 The schema used when writing 

1190 

1191 .. attribute:: reader_schema 

1192 

1193 The schema used when reading (if provided) 

1194 """ 

1195 

1196 def __init__( 

1197 self, 

1198 fo: IO, 

1199 reader_schema: Optional[Schema] = None, 

1200 return_record_name: bool = False, 

1201 return_record_name_override: bool = False, 

1202 handle_unicode_errors: str = "strict", 

1203 return_named_type: bool = False, 

1204 return_named_type_override: bool = False, 

1205 ): 

1206 options = { 

1207 "return_record_name": return_record_name, 

1208 "return_record_name_override": return_record_name_override, 

1209 "handle_unicode_errors": handle_unicode_errors, 

1210 "return_named_type": return_named_type, 

1211 "return_named_type_override": return_named_type_override, 

1212 } 

1213 super().__init__(fo, reader_schema, options) 

1214 

1215 self._read_header() 

1216 

1217 self._elems = _iter_avro_blocks( 

1218 self.decoder, 

1219 self._header, 

1220 self.codec, 

1221 self.writer_schema, 

1222 self._named_schemas, 

1223 self.reader_schema, 

1224 self.options, 

1225 ) 

1226 

1227 

1228def schemaless_reader( 

1229 fo: IO, 

1230 writer_schema: Schema, 

1231 reader_schema: Optional[Schema] = None, 

1232 return_record_name: bool = False, 

1233 return_record_name_override: bool = False, 

1234 handle_unicode_errors: str = "strict", 

1235 return_named_type: bool = False, 

1236 return_named_type_override: bool = False, 

1237) -> AvroMessage: 

1238 """Reads a single record written using the 

1239 :meth:`~fastavro._write_py.schemaless_writer` 

1240 

1241 Parameters 

1242 ---------- 

1243 fo 

1244 Input stream 

1245 writer_schema 

1246 Schema used when calling schemaless_writer 

1247 reader_schema 

1248 If the schema has changed since being written then the new schema can 

1249 be given to allow for schema migration 

1250 return_record_name 

1251 If true, when reading a union of records, the result will be a tuple 

1252 where the first value is the name of the record and the second value is 

1253 the record itself 

1254 return_record_name_override 

1255 If true, this will modify the behavior of return_record_name so that 

1256 the record name is only returned for unions where there is more than 

1257 one record. For unions that only have one record, this option will make 

1258 it so that the record is returned by itself, not a tuple with the name. 

1259 return_named_type 

1260 If true, when reading a union of named types, the result will be a tuple 

1261 where the first value is the name of the type and the second value is 

1262 the record itself 

1263 NOTE: Using this option will ignore return_record_name and 

1264 return_record_name_override 

1265 return_named_type_override 

1266 If true, this will modify the behavior of return_named_type so that 

1267 the named type is only returned for unions where there is more than 

1268 one named type. For unions that only have one named type, this option 

1269 will make it so that the named type is returned by itself, not a tuple 

1270 with the name 

1271 handle_unicode_errors 

1272 Default `strict`. Should be set to a valid string that can be used in 

1273 the errors argument of the string decode() function. Examples include 

1274 `replace` and `ignore` 

1275 

1276 

1277 Example:: 

1278 

1279 parsed_schema = fastavro.parse_schema(schema) 

1280 with open('file', 'rb') as fp: 

1281 record = fastavro.schemaless_reader(fp, parsed_schema) 

1282 

1283 Note: The ``schemaless_reader`` can only read a single record. 

1284 """ 

1285 if writer_schema == reader_schema: 

1286 # No need for the reader schema if they are the same 

1287 reader_schema = None 

1288 

1289 named_schemas: Dict[str, NamedSchemas] = _default_named_schemas() 

1290 writer_schema = parse_schema(writer_schema, named_schemas["writer"]) 

1291 

1292 if reader_schema: 

1293 reader_schema = parse_schema(reader_schema, named_schemas["reader"]) 

1294 

1295 decoder = BinaryDecoder(fo) 

1296 

1297 options = { 

1298 "return_record_name": return_record_name, 

1299 "return_record_name_override": return_record_name_override, 

1300 "handle_unicode_errors": handle_unicode_errors, 

1301 "return_named_type": return_named_type, 

1302 "return_named_type_override": return_named_type_override, 

1303 } 

1304 

1305 return read_data( 

1306 decoder, 

1307 writer_schema, 

1308 named_schemas, 

1309 reader_schema, 

1310 options, 

1311 ) 

1312 

1313 

1314def is_avro(path_or_buffer: Union[str, IO]) -> bool: 

1315 """Return True if path (or buffer) points to an Avro file. This will only 

1316 work for avro files that contain the normal avro schema header like those 

1317 create from :func:`~fastavro._write_py.writer`. This function is not intended 

1318 to be used with binary data created from 

1319 :func:`~fastavro._write_py.schemaless_writer` since that does not include the 

1320 avro header. 

1321 

1322 Parameters 

1323 ---------- 

1324 path_or_buffer 

1325 Path to file 

1326 """ 

1327 fp: IO 

1328 if isinstance(path_or_buffer, str): 

1329 fp = open(path_or_buffer, "rb") 

1330 close = True 

1331 else: 

1332 fp = path_or_buffer 

1333 close = False 

1334 

1335 try: 

1336 header = fp.read(len(MAGIC)) 

1337 return header == MAGIC 

1338 finally: 

1339 if close: 

1340 fp.close()