Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastavro/_read_py.py: 23%
416 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:10 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:10 +0000
1# cython: auto_cpdef=True
3"""Python code for reading AVRO files"""
5# This code is a modified version of the code at
6# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under
7# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0)
9import bz2
10import json
11import lzma
12import zlib
13from datetime import datetime, timezone
14from decimal import Context
15from io import BytesIO
16from struct import error as StructError
17from typing import IO, Union, Optional, Generic, TypeVar, Iterator, Dict
19from .io.binary_decoder import BinaryDecoder
20from .io.json_decoder import AvroJSONDecoder
21from .logical_readers import LOGICAL_READERS
22from .schema import (
23 extract_record_type,
24 is_nullable_union,
25 extract_logical_type,
26 parse_schema,
27)
28from .types import Schema, AvroMessage, NamedSchemas
29from ._read_common import (
30 SchemaResolutionError,
31 MAGIC,
32 SYNC_SIZE,
33 HEADER_SCHEMA,
34 missing_codec_lib,
35)
36from .const import NAMED_TYPES, AVRO_TYPES
38T = TypeVar("T")
40decimal_context = Context()
41epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
42epoch_naive = datetime(1970, 1, 1)
45def match_types(writer_type, reader_type):
46 if isinstance(writer_type, list) or isinstance(reader_type, list):
47 return True
48 if isinstance(writer_type, dict) or isinstance(reader_type, dict):
49 try:
50 return match_schemas(writer_type, reader_type)
51 except SchemaResolutionError:
52 return False
53 if writer_type == reader_type:
54 return True
55 # promotion cases
56 elif writer_type == "int" and reader_type in ["long", "float", "double"]:
57 return True
58 elif writer_type == "long" and reader_type in ["float", "double"]:
59 return True
60 elif writer_type == "float" and reader_type == "double":
61 return True
62 elif writer_type == "string" and reader_type == "bytes":
63 return True
64 elif writer_type == "bytes" and reader_type == "string":
65 return True
66 return False
69def match_schemas(w_schema, r_schema):
70 error_msg = f"Schema mismatch: {w_schema} is not {r_schema}"
71 if isinstance(w_schema, list):
72 # If the writer is a union, checks will happen in read_union after the
73 # correct schema is known
74 return r_schema
75 elif isinstance(r_schema, list):
76 # If the reader is a union, ensure one of the new schemas is the same
77 # as the writer
78 for schema in r_schema:
79 if match_types(w_schema, schema):
80 return schema
81 else:
82 raise SchemaResolutionError(error_msg)
83 else:
84 # Check for dicts as primitive types are just strings
85 if isinstance(w_schema, dict):
86 w_type = w_schema["type"]
87 else:
88 w_type = w_schema
89 if isinstance(r_schema, dict):
90 r_type = r_schema["type"]
91 else:
92 r_type = r_schema
94 if w_type == r_type == "map":
95 if match_types(w_schema["values"], r_schema["values"]):
96 return r_schema
97 elif w_type == r_type == "array":
98 if match_types(w_schema["items"], r_schema["items"]):
99 return r_schema
100 elif w_type in NAMED_TYPES and r_type in NAMED_TYPES:
101 if w_type == r_type == "fixed" and w_schema["size"] != r_schema["size"]:
102 raise SchemaResolutionError(
103 f"Schema mismatch: {w_schema} size is different than {r_schema} size"
104 )
106 w_unqual_name = w_schema["name"].split(".")[-1]
107 r_unqual_name = r_schema["name"].split(".")[-1]
108 if w_unqual_name == r_unqual_name or w_schema["name"] in r_schema.get(
109 "aliases", []
110 ):
111 return r_schema
112 elif w_type not in AVRO_TYPES and r_type in NAMED_TYPES:
113 if match_types(w_type, r_schema["name"]):
114 return r_schema["name"]
115 elif match_types(w_type, r_type):
116 return r_schema
117 raise SchemaResolutionError(error_msg)
120def read_null(
121 decoder,
122 writer_schema=None,
123 named_schemas=None,
124 reader_schema=None,
125 return_record_name=False,
126 return_record_name_override=False,
127):
128 return decoder.read_null()
131def skip_null(decoder, writer_schema=None, named_schemas=None):
132 decoder.read_null()
135def read_boolean(
136 decoder,
137 writer_schema=None,
138 named_schemas=None,
139 reader_schema=None,
140 return_record_name=False,
141 return_record_name_override=False,
142):
143 return decoder.read_boolean()
146def skip_boolean(decoder, writer_schema=None, named_schemas=None):
147 decoder.read_boolean()
150def read_int(
151 decoder,
152 writer_schema=None,
153 named_schemas=None,
154 reader_schema=None,
155 return_record_name=False,
156 return_record_name_override=False,
157):
158 return decoder.read_int()
161def skip_int(decoder, writer_schema=None, named_schemas=None):
162 decoder.read_int()
165def read_long(
166 decoder,
167 writer_schema=None,
168 named_schemas=None,
169 reader_schema=None,
170 return_record_name=False,
171 return_record_name_override=False,
172):
173 return decoder.read_long()
176def skip_long(decoder, writer_schema=None, named_schemas=None):
177 decoder.read_long()
180def read_float(
181 decoder,
182 writer_schema=None,
183 named_schemas=None,
184 reader_schema=None,
185 return_record_name=False,
186 return_record_name_override=False,
187):
188 return decoder.read_float()
191def skip_float(decoder, writer_schema=None, named_schemas=None):
192 decoder.read_float()
195def read_double(
196 decoder,
197 writer_schema=None,
198 named_schemas=None,
199 reader_schema=None,
200 return_record_name=False,
201 return_record_name_override=False,
202):
203 return decoder.read_double()
206def skip_double(decoder, writer_schema=None, named_schemas=None):
207 decoder.read_double()
210def read_bytes(
211 decoder,
212 writer_schema=None,
213 named_schemas=None,
214 reader_schema=None,
215 return_record_name=False,
216 return_record_name_override=False,
217):
218 return decoder.read_bytes()
221def skip_bytes(decoder, writer_schema=None, named_schemas=None):
222 decoder.read_bytes()
225def read_utf8(
226 decoder,
227 writer_schema=None,
228 named_schemas=None,
229 reader_schema=None,
230 return_record_name=False,
231 return_record_name_override=False,
232):
233 return decoder.read_utf8()
236def skip_utf8(decoder, writer_schema=None, named_schemas=None):
237 decoder.read_utf8()
240def read_fixed(
241 decoder,
242 writer_schema,
243 named_schemas=None,
244 reader_schema=None,
245 return_record_name=False,
246 return_record_name_override=False,
247):
248 size = writer_schema["size"]
249 return decoder.read_fixed(size)
252def skip_fixed(decoder, writer_schema, named_schemas=None):
253 size = writer_schema["size"]
254 decoder.read_fixed(size)
257def read_enum(
258 decoder,
259 writer_schema,
260 named_schemas,
261 reader_schema=None,
262 return_record_name=False,
263 return_record_name_override=False,
264):
265 symbol = writer_schema["symbols"][decoder.read_enum()]
266 if reader_schema and symbol not in reader_schema["symbols"]:
267 default = reader_schema.get("default")
268 if default:
269 return default
270 else:
271 symlist = reader_schema["symbols"]
272 msg = f"{symbol} not found in reader symbol list {symlist}"
273 raise SchemaResolutionError(msg)
274 return symbol
277def skip_enum(decoder, writer_schema, named_schemas):
278 decoder.read_enum()
281def read_array(
282 decoder,
283 writer_schema,
284 named_schemas,
285 reader_schema=None,
286 return_record_name=False,
287 return_record_name_override=False,
288):
289 if reader_schema:
291 def item_reader(
292 decoder, w_schema, r_schema, return_record_name, return_record_name_override
293 ):
294 return read_data(
295 decoder,
296 w_schema["items"],
297 named_schemas,
298 r_schema["items"],
299 return_record_name,
300 return_record_name_override,
301 )
303 else:
305 def item_reader(
306 decoder, w_schema, r_schema, return_record_name, return_record_name_override
307 ):
308 return read_data(
309 decoder,
310 w_schema["items"],
311 named_schemas,
312 None,
313 return_record_name,
314 return_record_name_override,
315 )
317 read_items = []
319 decoder.read_array_start()
321 for item in decoder.iter_array():
322 read_items.append(
323 item_reader(
324 decoder,
325 writer_schema,
326 reader_schema,
327 return_record_name,
328 return_record_name_override,
329 )
330 )
332 decoder.read_array_end()
334 return read_items
337def skip_array(decoder, writer_schema, named_schemas):
338 decoder.read_array_start()
340 for item in decoder.iter_array():
341 skip_data(decoder, writer_schema["items"], named_schemas)
343 decoder.read_array_end()
346def read_map(
347 decoder,
348 writer_schema,
349 named_schemas,
350 reader_schema=None,
351 return_record_name=False,
352 return_record_name_override=False,
353):
354 if reader_schema:
356 def item_reader(decoder, w_schema, r_schema):
357 return read_data(
358 decoder,
359 w_schema["values"],
360 named_schemas,
361 r_schema["values"],
362 return_record_name,
363 return_record_name_override,
364 )
366 else:
368 def item_reader(decoder, w_schema, r_schema):
369 return read_data(
370 decoder,
371 w_schema["values"],
372 named_schemas,
373 None,
374 return_record_name,
375 return_record_name_override,
376 )
378 read_items = {}
380 decoder.read_map_start()
382 for item in decoder.iter_map():
383 key = decoder.read_utf8()
384 read_items[key] = item_reader(decoder, writer_schema, reader_schema)
386 decoder.read_map_end()
388 return read_items
391def skip_map(decoder, writer_schema, named_schemas):
392 decoder.read_map_start()
394 for item in decoder.iter_map():
395 decoder.read_utf8()
396 skip_data(decoder, writer_schema["values"], named_schemas)
398 decoder.read_map_end()
401def read_union(
402 decoder,
403 writer_schema,
404 named_schemas,
405 reader_schema=None,
406 return_record_name=False,
407 return_record_name_override=False,
408):
409 # schema resolution
410 index = decoder.read_index()
411 idx_schema = writer_schema[index]
413 if reader_schema:
414 # Handle case where the reader schema is just a single type (not union)
415 if not isinstance(reader_schema, list):
416 if match_types(idx_schema, reader_schema):
417 return read_data(
418 decoder,
419 idx_schema,
420 named_schemas,
421 reader_schema,
422 return_record_name,
423 return_record_name_override,
424 )
425 else:
426 for schema in reader_schema:
427 if match_types(idx_schema, schema):
428 return read_data(
429 decoder,
430 idx_schema,
431 named_schemas,
432 schema,
433 return_record_name,
434 return_record_name_override,
435 )
436 msg = f"schema mismatch: {writer_schema} not found in {reader_schema}"
437 raise SchemaResolutionError(msg)
438 else:
439 if return_record_name_override and is_nullable_union(writer_schema):
440 return read_data(
441 decoder,
442 idx_schema,
443 named_schemas,
444 None,
445 return_record_name,
446 return_record_name_override,
447 )
448 elif return_record_name and extract_record_type(idx_schema) == "record":
449 return (
450 idx_schema["name"],
451 read_data(
452 decoder,
453 idx_schema,
454 named_schemas,
455 None,
456 return_record_name,
457 return_record_name_override,
458 ),
459 )
460 elif return_record_name and extract_record_type(idx_schema) not in AVRO_TYPES:
461 # idx_schema is a named type
462 return (
463 named_schemas["writer"][idx_schema]["name"],
464 read_data(
465 decoder,
466 idx_schema,
467 named_schemas,
468 None,
469 return_record_name,
470 return_record_name_override,
471 ),
472 )
473 else:
474 return read_data(decoder, idx_schema, named_schemas)
477def skip_union(decoder, writer_schema, named_schemas):
478 # schema resolution
479 index = decoder.read_index()
480 skip_data(decoder, writer_schema[index], named_schemas)
483def read_record(
484 decoder,
485 writer_schema,
486 named_schemas,
487 reader_schema=None,
488 return_record_name=False,
489 return_record_name_override=False,
490):
491 """A record is encoded by encoding the values of its fields in the order
492 that they are declared. In other words, a record is encoded as just the
493 concatenation of the encodings of its fields. Field values are encoded per
494 their schema.
496 Schema Resolution:
497 * the ordering of fields may be different: fields are matched by name.
498 * schemas for fields with the same name in both records are resolved
499 recursively.
500 * if the writer's record contains a field with a name not present in the
501 reader's record, the writer's value for that field is ignored.
502 * if the reader's record schema has a field that contains a default value,
503 and writer's schema does not have a field with the same name, then the
504 reader should use the default value from its field.
505 * if the reader's record schema has a field with no default value, and
506 writer's schema does not have a field with the same name, then the
507 field's value is unset.
508 """
509 record = {}
510 if reader_schema is None:
511 for field in writer_schema["fields"]:
512 record[field["name"]] = read_data(
513 decoder,
514 field["type"],
515 named_schemas,
516 None,
517 return_record_name,
518 return_record_name_override,
519 )
520 else:
521 readers_field_dict = {}
522 aliases_field_dict = {}
523 for f in reader_schema["fields"]:
524 readers_field_dict[f["name"]] = f
525 for alias in f.get("aliases", []):
526 aliases_field_dict[alias] = f
528 for field in writer_schema["fields"]:
529 readers_field = readers_field_dict.get(
530 field["name"],
531 aliases_field_dict.get(field["name"]),
532 )
533 if readers_field:
534 record[readers_field["name"]] = read_data(
535 decoder,
536 field["type"],
537 named_schemas,
538 readers_field["type"],
539 return_record_name,
540 return_record_name_override,
541 )
542 else:
543 skip_data(decoder, field["type"], named_schemas)
545 # fill in default values
546 if len(readers_field_dict) > len(record):
547 writer_fields = [f["name"] for f in writer_schema["fields"]]
548 for f_name, field in readers_field_dict.items():
549 if f_name not in writer_fields and f_name not in record:
550 if "default" in field:
551 record[field["name"]] = field["default"]
552 else:
553 msg = f'No default value for {field["name"]}'
554 raise SchemaResolutionError(msg)
556 return record
559def skip_record(decoder, writer_schema, named_schemas):
560 for field in writer_schema["fields"]:
561 skip_data(decoder, field["type"], named_schemas)
564READERS = {
565 "null": read_null,
566 "boolean": read_boolean,
567 "string": read_utf8,
568 "int": read_int,
569 "long": read_long,
570 "float": read_float,
571 "double": read_double,
572 "bytes": read_bytes,
573 "fixed": read_fixed,
574 "enum": read_enum,
575 "array": read_array,
576 "map": read_map,
577 "union": read_union,
578 "error_union": read_union,
579 "record": read_record,
580 "error": read_record,
581 "request": read_record,
582}
584SKIPS = {
585 "null": skip_null,
586 "boolean": skip_boolean,
587 "string": skip_utf8,
588 "int": skip_int,
589 "long": skip_long,
590 "float": skip_float,
591 "double": skip_double,
592 "bytes": skip_bytes,
593 "fixed": skip_fixed,
594 "enum": skip_enum,
595 "array": skip_array,
596 "map": skip_map,
597 "union": skip_union,
598 "error_union": skip_union,
599 "record": skip_record,
600 "error": skip_record,
601 "request": skip_record,
602}
605def maybe_promote(data, writer_type, reader_type):
606 if writer_type == "int":
607 # No need to promote to long since they are the same type in Python
608 if reader_type == "float" or reader_type == "double":
609 return float(data)
610 if writer_type == "long":
611 if reader_type == "float" or reader_type == "double":
612 return float(data)
613 if writer_type == "string" and reader_type == "bytes":
614 return data.encode()
615 if writer_type == "bytes" and reader_type == "string":
616 return data.decode()
617 return data
620def read_data(
621 decoder,
622 writer_schema,
623 named_schemas,
624 reader_schema=None,
625 return_record_name=False,
626 return_record_name_override=False,
627):
628 """Read data from file object according to schema."""
630 record_type = extract_record_type(writer_schema)
632 if reader_schema:
633 reader_schema = match_schemas(writer_schema, reader_schema)
635 reader_fn = READERS.get(record_type)
636 if reader_fn:
637 try:
638 data = reader_fn(
639 decoder,
640 writer_schema,
641 named_schemas,
642 reader_schema,
643 return_record_name,
644 return_record_name_override,
645 )
646 except StructError:
647 raise EOFError(f"cannot read {record_type} from {decoder.fo}")
649 if "logicalType" in writer_schema:
650 logical_type = extract_logical_type(writer_schema)
651 fn = LOGICAL_READERS.get(logical_type)
652 if fn:
653 return fn(data, writer_schema, reader_schema)
655 if reader_schema is not None:
656 return maybe_promote(data, record_type, extract_record_type(reader_schema))
657 else:
658 return data
659 else:
660 return read_data(
661 decoder,
662 named_schemas["writer"][record_type],
663 named_schemas,
664 named_schemas["reader"].get(reader_schema),
665 return_record_name,
666 return_record_name_override,
667 )
670def skip_data(decoder, writer_schema, named_schemas):
671 record_type = extract_record_type(writer_schema)
673 reader_fn = SKIPS.get(record_type)
674 if reader_fn:
675 reader_fn(decoder, writer_schema, named_schemas)
676 else:
677 skip_data(decoder, named_schemas["writer"][record_type], named_schemas)
680def skip_sync(fo, sync_marker):
681 """Skip an expected sync marker, complaining if it doesn't match"""
682 if fo.read(SYNC_SIZE) != sync_marker:
683 raise ValueError("expected sync marker not found")
686def null_read_block(decoder):
687 """Read block in "null" codec."""
688 return BytesIO(decoder.read_bytes())
691def deflate_read_block(decoder):
692 """Read block in "deflate" codec."""
693 data = decoder.read_bytes()
694 # -15 is the log of the window size; negative indicates "raw" (no
695 # zlib headers) decompression. See zlib.h.
696 return BytesIO(zlib.decompressobj(-15).decompress(data))
699def bzip2_read_block(decoder):
700 """Read block in "bzip2" codec."""
701 data = decoder.read_bytes()
702 return BytesIO(bz2.decompress(data))
705def xz_read_block(decoder):
706 length = read_long(decoder)
707 data = decoder.read_fixed(length)
708 return BytesIO(lzma.decompress(data))
711BLOCK_READERS = {
712 "null": null_read_block,
713 "deflate": deflate_read_block,
714 "bzip2": bzip2_read_block,
715 "xz": xz_read_block,
716}
719def snappy_read_block(decoder):
720 length = read_long(decoder)
721 data = decoder.read_fixed(length - 4)
722 decoder.read_fixed(4) # CRC
723 return BytesIO(snappy.decompress(data))
726try:
727 import snappy
728except ImportError:
729 BLOCK_READERS["snappy"] = missing_codec_lib("snappy", "python-snappy")
730else:
731 BLOCK_READERS["snappy"] = snappy_read_block
734def zstandard_read_block(decoder):
735 length = read_long(decoder)
736 data = decoder.read_fixed(length)
737 return BytesIO(zstd.ZstdDecompressor().decompressobj().decompress(data))
740try:
741 import zstandard as zstd
742except ImportError:
743 BLOCK_READERS["zstandard"] = missing_codec_lib("zstandard", "zstandard")
744else:
745 BLOCK_READERS["zstandard"] = zstandard_read_block
748def lz4_read_block(decoder):
749 length = read_long(decoder)
750 data = decoder.read_fixed(length)
751 return BytesIO(lz4.block.decompress(data))
754try:
755 import lz4.block
756except ImportError:
757 BLOCK_READERS["lz4"] = missing_codec_lib("lz4", "lz4")
758else:
759 BLOCK_READERS["lz4"] = lz4_read_block
762def _iter_avro_records(
763 decoder,
764 header,
765 codec,
766 writer_schema,
767 named_schemas,
768 reader_schema,
769 return_record_name=False,
770 return_record_name_override=False,
771):
772 """Return iterator over avro records."""
773 sync_marker = header["sync"]
775 read_block = BLOCK_READERS.get(codec)
776 if not read_block:
777 raise ValueError(f"Unrecognized codec: {codec}")
779 block_count = 0
780 while True:
781 try:
782 block_count = decoder.read_long()
783 except StopIteration:
784 return
786 block_fo = read_block(decoder)
788 for i in range(block_count):
789 yield read_data(
790 BinaryDecoder(block_fo),
791 writer_schema,
792 named_schemas,
793 reader_schema,
794 return_record_name,
795 return_record_name_override,
796 )
798 skip_sync(decoder.fo, sync_marker)
801def _iter_avro_blocks(
802 decoder,
803 header,
804 codec,
805 writer_schema,
806 named_schemas,
807 reader_schema,
808 return_record_name=False,
809 return_record_name_override=False,
810):
811 """Return iterator over avro blocks."""
812 sync_marker = header["sync"]
814 read_block = BLOCK_READERS.get(codec)
815 if not read_block:
816 raise ValueError(f"Unrecognized codec: {codec}")
818 while True:
819 offset = decoder.fo.tell()
820 try:
821 num_block_records = decoder.read_long()
822 except StopIteration:
823 return
825 block_bytes = read_block(decoder)
827 skip_sync(decoder.fo, sync_marker)
829 size = decoder.fo.tell() - offset
831 yield Block(
832 block_bytes,
833 num_block_records,
834 codec,
835 reader_schema,
836 writer_schema,
837 named_schemas,
838 offset,
839 size,
840 return_record_name,
841 return_record_name_override,
842 )
845class Block:
846 """An avro block. Will yield records when iterated over
848 .. attribute:: num_records
850 Number of records in the block
852 .. attribute:: writer_schema
854 The schema used when writing
856 .. attribute:: reader_schema
858 The schema used when reading (if provided)
860 .. attribute:: offset
862 Offset of the block from the beginning of the avro file
864 .. attribute:: size
866 Size of the block in bytes
867 """
869 def __init__(
870 self,
871 bytes_,
872 num_records,
873 codec,
874 reader_schema,
875 writer_schema,
876 named_schemas,
877 offset,
878 size,
879 return_record_name=False,
880 return_record_name_override=False,
881 ):
882 self.bytes_ = bytes_
883 self.num_records = num_records
884 self.codec = codec
885 self.reader_schema = reader_schema
886 self.writer_schema = writer_schema
887 self._named_schemas = named_schemas
888 self.offset = offset
889 self.size = size
890 self.return_record_name = return_record_name
891 self.return_record_name_override = return_record_name_override
893 def __iter__(self):
894 for i in range(self.num_records):
895 yield read_data(
896 BinaryDecoder(self.bytes_),
897 self.writer_schema,
898 self._named_schemas,
899 self.reader_schema,
900 self.return_record_name,
901 self.return_record_name_override,
902 )
904 def __str__(self):
905 return (
906 f"Avro block: {len(self.bytes_)} bytes, "
907 + f"{self.num_records} records, "
908 + f"codec: {self.codec}, position {self.offset}+{self.size}"
909 )
912class file_reader(Generic[T]):
913 def __init__(
914 self,
915 fo_or_decoder,
916 reader_schema=None,
917 return_record_name=False,
918 return_record_name_override=False,
919 ):
920 if isinstance(fo_or_decoder, AvroJSONDecoder):
921 self.decoder = fo_or_decoder
922 else:
923 # If a decoder was not provided, assume binary
924 self.decoder = BinaryDecoder(fo_or_decoder)
926 self._named_schemas = {"writer": {}, "reader": {}}
927 if reader_schema:
928 self.reader_schema = parse_schema(
929 reader_schema, self._named_schemas["reader"], _write_hint=False
930 )
932 else:
933 self.reader_schema = None
934 self.return_record_name = return_record_name
935 self.return_record_name_override = return_record_name_override
936 self._elems = None
938 def _read_header(self):
939 try:
940 self._header = read_data(
941 self.decoder,
942 HEADER_SCHEMA,
943 self._named_schemas,
944 None,
945 self.return_record_name,
946 self.return_record_name_override,
947 )
948 except (StopIteration, EOFError):
949 raise ValueError("cannot read header - is it an avro file?")
951 # `meta` values are bytes. So, the actual decoding has to be external.
952 self.metadata = {k: v.decode() for k, v in self._header["meta"].items()}
954 self._schema = json.loads(self.metadata["avro.schema"])
955 self.codec = self.metadata.get("avro.codec", "null")
957 # Older avro files created before we were more strict about
958 # defaults might have been writen with a bad default. Since we re-parse
959 # the writer schema here, it will now fail. Therefore, if a user
960 # provides a reader schema that passes parsing, we will ignore those
961 # default errors
962 if self.reader_schema is not None:
963 ignore_default_error = True
964 else:
965 ignore_default_error = False
967 # Always parse the writer schema since it might have named types that
968 # need to be stored in self._named_types
969 self.writer_schema = parse_schema(
970 self._schema,
971 self._named_schemas["writer"],
972 _write_hint=False,
973 _force=True,
974 _ignore_default_error=ignore_default_error,
975 )
977 @property
978 def schema(self):
979 import warnings
981 warnings.warn(
982 "The 'schema' attribute is deprecated. Please use 'writer_schema'",
983 DeprecationWarning,
984 )
985 return self._schema
987 def __iter__(self) -> Iterator[T]:
988 if not self._elems:
989 raise NotImplementedError
990 return self._elems
992 def __next__(self) -> T:
993 return next(self._elems)
996class reader(file_reader[AvroMessage]):
997 """Iterator over records in an avro file.
999 Parameters
1000 ----------
1001 fo
1002 File-like object to read from
1003 reader_schema
1004 Reader schema
1005 return_record_name
1006 If true, when reading a union of records, the result will be a tuple
1007 where the first value is the name of the record and the second value is
1008 the record itself
1009 return_record_name_override
1010 If true, this will modify the behavior of return_record_name so that
1011 the record name is only returned for unions where there is more than
1012 one record. For unions that only have one record, this option will make
1013 it so that the record is returned by itself, not a tuple with the name.
1016 Example::
1018 from fastavro import reader
1019 with open('some-file.avro', 'rb') as fo:
1020 avro_reader = reader(fo)
1021 for record in avro_reader:
1022 process_record(record)
1024 The `fo` argument is a file-like object so another common example usage
1025 would use an `io.BytesIO` object like so::
1027 from io import BytesIO
1028 from fastavro import writer, reader
1030 fo = BytesIO()
1031 writer(fo, schema, records)
1032 fo.seek(0)
1033 for record in reader(fo):
1034 process_record(record)
1036 .. attribute:: metadata
1038 Key-value pairs in the header metadata
1040 .. attribute:: codec
1042 The codec used when writing
1044 .. attribute:: writer_schema
1046 The schema used when writing
1048 .. attribute:: reader_schema
1050 The schema used when reading (if provided)
1051 """
1053 def __init__(
1054 self,
1055 fo: Union[IO, AvroJSONDecoder],
1056 reader_schema: Optional[Schema] = None,
1057 return_record_name: bool = False,
1058 return_record_name_override: bool = False,
1059 ):
1060 super().__init__(
1061 fo, reader_schema, return_record_name, return_record_name_override
1062 )
1064 if isinstance(self.decoder, AvroJSONDecoder):
1065 self.decoder.configure(self.reader_schema, self._named_schemas["reader"])
1067 self.writer_schema = self.reader_schema
1068 self.reader_schema = None
1069 self._named_schemas["writer"] = self._named_schemas["reader"]
1070 self._named_schemas["reader"] = {}
1072 def _elems():
1073 while not self.decoder.done:
1074 yield read_data(
1075 self.decoder,
1076 self.writer_schema,
1077 self._named_schemas,
1078 self.reader_schema,
1079 self.return_record_name,
1080 self.return_record_name_override,
1081 )
1082 self.decoder.drain()
1084 self._elems = _elems()
1086 else:
1087 self._read_header()
1089 self._elems = _iter_avro_records(
1090 self.decoder,
1091 self._header,
1092 self.codec,
1093 self.writer_schema,
1094 self._named_schemas,
1095 self.reader_schema,
1096 self.return_record_name,
1097 self.return_record_name_override,
1098 )
1101class block_reader(file_reader[Block]):
1102 """Iterator over :class:`.Block` in an avro file.
1104 Parameters
1105 ----------
1106 fo
1107 Input stream
1108 reader_schema
1109 Reader schema
1110 return_record_name
1111 If true, when reading a union of records, the result will be a tuple
1112 where the first value is the name of the record and the second value is
1113 the record itself
1114 return_record_name_override
1115 If true, this will modify the behavior of return_record_name so that
1116 the record name is only returned for unions where there is more than
1117 one record. For unions that only have one record, this option will make
1118 it so that the record is returned by itself, not a tuple with the name.
1121 Example::
1123 from fastavro import block_reader
1124 with open('some-file.avro', 'rb') as fo:
1125 avro_reader = block_reader(fo)
1126 for block in avro_reader:
1127 process_block(block)
1129 .. attribute:: metadata
1131 Key-value pairs in the header metadata
1133 .. attribute:: codec
1135 The codec used when writing
1137 .. attribute:: writer_schema
1139 The schema used when writing
1141 .. attribute:: reader_schema
1143 The schema used when reading (if provided)
1144 """
1146 def __init__(
1147 self,
1148 fo: IO,
1149 reader_schema: Optional[Schema] = None,
1150 return_record_name: bool = False,
1151 return_record_name_override: bool = False,
1152 ):
1153 super().__init__(
1154 fo, reader_schema, return_record_name, return_record_name_override
1155 )
1157 self._read_header()
1159 self._elems = _iter_avro_blocks(
1160 self.decoder,
1161 self._header,
1162 self.codec,
1163 self.writer_schema,
1164 self._named_schemas,
1165 self.reader_schema,
1166 self.return_record_name,
1167 self.return_record_name_override,
1168 )
1171def schemaless_reader(
1172 fo: IO,
1173 writer_schema: Schema,
1174 reader_schema: Optional[Schema] = None,
1175 return_record_name: bool = False,
1176 return_record_name_override: bool = False,
1177) -> AvroMessage:
1178 """Reads a single record written using the
1179 :meth:`~fastavro._write_py.schemaless_writer`
1181 Parameters
1182 ----------
1183 fo
1184 Input stream
1185 writer_schema
1186 Schema used when calling schemaless_writer
1187 reader_schema
1188 If the schema has changed since being written then the new schema can
1189 be given to allow for schema migration
1190 return_record_name
1191 If true, when reading a union of records, the result will be a tuple
1192 where the first value is the name of the record and the second value is
1193 the record itself
1194 return_record_name_override
1195 If true, this will modify the behavior of return_record_name so that
1196 the record name is only returned for unions where there is more than
1197 one record. For unions that only have one record, this option will make
1198 it so that the record is returned by itself, not a tuple with the name.
1201 Example::
1203 parsed_schema = fastavro.parse_schema(schema)
1204 with open('file', 'rb') as fp:
1205 record = fastavro.schemaless_reader(fp, parsed_schema)
1207 Note: The ``schemaless_reader`` can only read a single record.
1208 """
1209 if writer_schema == reader_schema:
1210 # No need for the reader schema if they are the same
1211 reader_schema = None
1213 named_schemas: Dict[str, NamedSchemas] = {"writer": {}, "reader": {}}
1214 writer_schema = parse_schema(writer_schema, named_schemas["writer"])
1216 if reader_schema:
1217 reader_schema = parse_schema(reader_schema, named_schemas["reader"])
1219 decoder = BinaryDecoder(fo)
1221 return read_data(
1222 decoder,
1223 writer_schema,
1224 named_schemas,
1225 reader_schema,
1226 return_record_name,
1227 return_record_name_override,
1228 )
1231def is_avro(path_or_buffer: Union[str, IO]) -> bool:
1232 """Return True if path (or buffer) points to an Avro file. This will only
1233 work for avro files that contain the normal avro schema header like those
1234 create from :func:`~fastavro._write_py.writer`. This function is not intended
1235 to be used with binary data created from
1236 :func:`~fastavro._write_py.schemaless_writer` since that does not include the
1237 avro header.
1239 Parameters
1240 ----------
1241 path_or_buffer
1242 Path to file
1243 """
1244 fp: IO
1245 if isinstance(path_or_buffer, str):
1246 fp = open(path_or_buffer, "rb")
1247 close = True
1248 else:
1249 fp = path_or_buffer
1250 close = False
1252 try:
1253 header = fp.read(len(MAGIC))
1254 return header == MAGIC
1255 finally:
1256 if close:
1257 fp.close()