Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastavro/_read_py.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Python code for reading AVRO files"""
3# This code is a modified version of the code at
4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under
5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0)
7import bz2
8import json
9import lzma
10import zlib
11from datetime import datetime, timezone
12from decimal import Context
13from io import BytesIO
14from struct import error as StructError
15from typing import IO, Union, Optional, Generic, TypeVar, Iterator, Dict
16from warnings import warn
18from .io.binary_decoder import BinaryDecoder
19from .io.json_decoder import AvroJSONDecoder
20from .logical_readers import LOGICAL_READERS
21from .schema import (
22 extract_record_type,
23 is_single_record_union,
24 is_single_name_union,
25 extract_logical_type,
26 parse_schema,
27)
28from .types import Schema, AvroMessage, NamedSchemas
29from ._read_common import (
30 SchemaResolutionError,
31 MAGIC,
32 SYNC_SIZE,
33 HEADER_SCHEMA,
34 missing_codec_lib,
35)
36from .const import NAMED_TYPES, AVRO_TYPES
38T = TypeVar("T")
40decimal_context = Context()
41epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
42epoch_naive = datetime(1970, 1, 1)
45def _default_named_schemas() -> Dict[str, NamedSchemas]:
46 return {"writer": {}, "reader": {}}
49def match_types(writer_type, reader_type, named_schemas):
50 if isinstance(writer_type, list) or isinstance(reader_type, list):
51 return True
52 if isinstance(writer_type, dict) or isinstance(reader_type, dict):
53 try:
54 return match_schemas(writer_type, reader_type, named_schemas)
55 except SchemaResolutionError:
56 return False
57 if writer_type == reader_type:
58 return True
59 # promotion cases
60 elif writer_type == "int" and reader_type in ["long", "float", "double"]:
61 return True
62 elif writer_type == "long" and reader_type in ["float", "double"]:
63 return True
64 elif writer_type == "float" and reader_type == "double":
65 return True
66 elif writer_type == "string" and reader_type == "bytes":
67 return True
68 elif writer_type == "bytes" and reader_type == "string":
69 return True
70 writer_schema = named_schemas["writer"].get(writer_type)
71 reader_schema = named_schemas["reader"].get(reader_type)
72 if writer_schema is not None and reader_schema is not None:
73 return match_types(writer_schema, reader_schema, named_schemas)
74 return False
77def match_schemas(w_schema, r_schema, named_schemas):
78 error_msg = f"Schema mismatch: {w_schema} is not {r_schema}"
79 if isinstance(w_schema, list):
80 # If the writer is a union, checks will happen in read_union after the
81 # correct schema is known
82 return r_schema
83 elif isinstance(r_schema, list):
84 # If the reader is a union, ensure one of the new schemas is the same
85 # as the writer
86 for schema in r_schema:
87 if match_types(w_schema, schema, named_schemas):
88 return schema
89 else:
90 raise SchemaResolutionError(error_msg)
91 else:
92 # Check for dicts as primitive types are just strings
93 if isinstance(w_schema, dict):
94 w_type = w_schema["type"]
95 else:
96 w_type = w_schema
97 if isinstance(r_schema, dict):
98 r_type = r_schema["type"]
99 else:
100 r_type = r_schema
102 if w_type == r_type == "map":
103 if match_types(w_schema["values"], r_schema["values"], named_schemas):
104 return r_schema
105 elif w_type == r_type == "array":
106 if match_types(w_schema["items"], r_schema["items"], named_schemas):
107 return r_schema
108 elif w_type in NAMED_TYPES and r_type in NAMED_TYPES:
109 if w_type == r_type == "fixed" and w_schema["size"] != r_schema["size"]:
110 raise SchemaResolutionError(
111 f"Schema mismatch: {w_schema} size is different than {r_schema} size"
112 )
114 w_unqual_name = w_schema["name"].split(".")[-1]
115 r_unqual_name = r_schema["name"].split(".")[-1]
116 r_aliases = r_schema.get("aliases", [])
117 if (
118 w_unqual_name == r_unqual_name
119 or w_schema["name"] in r_aliases
120 or w_unqual_name in r_aliases
121 ):
122 return r_schema
123 elif w_type not in AVRO_TYPES and r_type in NAMED_TYPES:
124 if match_types(w_type, r_schema["name"], named_schemas):
125 return r_schema["name"]
126 elif match_types(w_type, r_type, named_schemas):
127 return r_schema
128 raise SchemaResolutionError(error_msg)
131def read_null(
132 decoder,
133 writer_schema=None,
134 named_schemas=None,
135 reader_schema=None,
136 options={},
137):
138 return decoder.read_null()
141def skip_null(decoder, writer_schema=None, named_schemas=None):
142 decoder.read_null()
145def read_boolean(
146 decoder,
147 writer_schema=None,
148 named_schemas=None,
149 reader_schema=None,
150 options={},
151):
152 return decoder.read_boolean()
155def skip_boolean(decoder, writer_schema=None, named_schemas=None):
156 decoder.read_boolean()
159def read_int(
160 decoder,
161 writer_schema=None,
162 named_schemas=None,
163 reader_schema=None,
164 options={},
165):
166 return decoder.read_int()
169def skip_int(decoder, writer_schema=None, named_schemas=None):
170 decoder.read_int()
173def read_long(
174 decoder,
175 writer_schema=None,
176 named_schemas=None,
177 reader_schema=None,
178 options={},
179):
180 return decoder.read_long()
183def skip_long(decoder, writer_schema=None, named_schemas=None):
184 decoder.read_long()
187def read_float(
188 decoder,
189 writer_schema=None,
190 named_schemas=None,
191 reader_schema=None,
192 options={},
193):
194 return decoder.read_float()
197def skip_float(decoder, writer_schema=None, named_schemas=None):
198 decoder.read_float()
201def read_double(
202 decoder,
203 writer_schema=None,
204 named_schemas=None,
205 reader_schema=None,
206 options={},
207):
208 return decoder.read_double()
211def skip_double(decoder, writer_schema=None, named_schemas=None):
212 decoder.read_double()
215def read_bytes(
216 decoder,
217 writer_schema=None,
218 named_schemas=None,
219 reader_schema=None,
220 options={},
221):
222 return decoder.read_bytes()
225def skip_bytes(decoder, writer_schema=None, named_schemas=None):
226 decoder.read_bytes()
229def read_utf8(
230 decoder,
231 writer_schema=None,
232 named_schemas=None,
233 reader_schema=None,
234 options={},
235):
236 return decoder.read_utf8(
237 handle_unicode_errors=options.get("handle_unicode_errors", "strict")
238 )
241def skip_utf8(decoder, writer_schema=None, named_schemas=None):
242 decoder.read_utf8()
245def read_fixed(
246 decoder,
247 writer_schema,
248 named_schemas=None,
249 reader_schema=None,
250 options={},
251):
252 size = writer_schema["size"]
253 return decoder.read_fixed(size)
256def skip_fixed(decoder, writer_schema, named_schemas=None):
257 size = writer_schema["size"]
258 decoder.read_fixed(size)
261def read_enum(
262 decoder,
263 writer_schema,
264 named_schemas,
265 reader_schema=None,
266 options={},
267):
268 symbol = writer_schema["symbols"][decoder.read_enum()]
269 if reader_schema and symbol not in reader_schema["symbols"]:
270 default = reader_schema.get("default")
271 if default:
272 return default
273 else:
274 symlist = reader_schema["symbols"]
275 msg = f"{symbol} not found in reader symbol list {reader_schema['name']}, known symbols: {symlist}"
276 raise SchemaResolutionError(msg)
277 return symbol
280def skip_enum(decoder, writer_schema, named_schemas):
281 decoder.read_enum()
284def read_array(
285 decoder,
286 writer_schema,
287 named_schemas,
288 reader_schema=None,
289 options={},
290):
291 if reader_schema:
293 def item_reader(decoder, w_schema, r_schema, options):
294 return read_data(
295 decoder,
296 w_schema["items"],
297 named_schemas,
298 r_schema["items"],
299 options,
300 )
302 else:
304 def item_reader(decoder, w_schema, r_schema, options):
305 return read_data(
306 decoder,
307 w_schema["items"],
308 named_schemas,
309 None,
310 options,
311 )
313 read_items = []
315 decoder.read_array_start()
317 for item in decoder.iter_array():
318 read_items.append(
319 item_reader(
320 decoder,
321 writer_schema,
322 reader_schema,
323 options,
324 )
325 )
327 decoder.read_array_end()
329 return read_items
332def skip_array(decoder, writer_schema, named_schemas):
333 decoder.read_array_start()
335 for item in decoder.iter_array():
336 skip_data(decoder, writer_schema["items"], named_schemas)
338 decoder.read_array_end()
341def read_map(
342 decoder,
343 writer_schema,
344 named_schemas,
345 reader_schema=None,
346 options={},
347):
348 if reader_schema:
350 def item_reader(decoder, w_schema, r_schema):
351 return read_data(
352 decoder,
353 w_schema["values"],
354 named_schemas,
355 r_schema["values"],
356 options,
357 )
359 else:
361 def item_reader(decoder, w_schema, r_schema):
362 return read_data(
363 decoder,
364 w_schema["values"],
365 named_schemas,
366 None,
367 options,
368 )
370 read_items = {}
372 decoder.read_map_start()
374 for item in decoder.iter_map():
375 key = decoder.read_utf8()
376 read_items[key] = item_reader(decoder, writer_schema, reader_schema)
378 decoder.read_map_end()
380 return read_items
383def skip_map(decoder, writer_schema, named_schemas):
384 decoder.read_map_start()
386 for item in decoder.iter_map():
387 decoder.read_utf8()
388 skip_data(decoder, writer_schema["values"], named_schemas)
390 decoder.read_map_end()
393def read_union(
394 decoder,
395 writer_schema,
396 named_schemas,
397 reader_schema=None,
398 options={},
399):
400 # schema resolution
401 index = decoder.read_index()
402 idx_schema = writer_schema[index]
404 if reader_schema:
405 msg = f"schema mismatch: {writer_schema} not found in {reader_schema}"
406 # Handle case where the reader schema is just a single type (not union)
407 if not isinstance(reader_schema, list):
408 if match_types(idx_schema, reader_schema, named_schemas):
409 result = read_data(
410 decoder,
411 idx_schema,
412 named_schemas,
413 reader_schema,
414 options,
415 )
416 else:
417 raise SchemaResolutionError(msg)
418 else:
419 for schema in reader_schema:
420 if match_types(idx_schema, schema, named_schemas):
421 result = read_data(
422 decoder,
423 idx_schema,
424 named_schemas,
425 schema,
426 options,
427 )
428 break
429 else:
430 raise SchemaResolutionError(msg)
431 else:
432 result = read_data(decoder, idx_schema, named_schemas, None, options)
434 return_record_name_override = options.get("return_record_name_override")
435 return_record_name = options.get("return_record_name")
436 return_named_type_override = options.get("return_named_type_override")
437 return_named_type = options.get("return_named_type")
438 if return_named_type_override and is_single_name_union(writer_schema):
439 return result
440 elif return_named_type and extract_record_type(idx_schema) in NAMED_TYPES:
441 return (idx_schema["name"], result)
442 elif return_named_type and extract_record_type(idx_schema) not in AVRO_TYPES:
443 # idx_schema is a named type
444 return (named_schemas["writer"][idx_schema]["name"], result)
445 elif return_record_name_override and is_single_record_union(writer_schema):
446 return result
447 elif return_record_name and extract_record_type(idx_schema) == "record":
448 return (idx_schema["name"], result)
449 elif return_record_name and extract_record_type(idx_schema) not in AVRO_TYPES:
450 # idx_schema is a named type
451 return (named_schemas["writer"][idx_schema]["name"], result)
452 else:
453 return result
456def skip_union(decoder, writer_schema, named_schemas):
457 # schema resolution
458 index = decoder.read_index()
459 skip_data(decoder, writer_schema[index], named_schemas)
462def read_record(
463 decoder,
464 writer_schema,
465 named_schemas,
466 reader_schema=None,
467 options={},
468):
469 """A record is encoded by encoding the values of its fields in the order
470 that they are declared. In other words, a record is encoded as just the
471 concatenation of the encodings of its fields. Field values are encoded per
472 their schema.
474 Schema Resolution:
475 * the ordering of fields may be different: fields are matched by name.
476 * schemas for fields with the same name in both records are resolved
477 recursively.
478 * if the writer's record contains a field with a name not present in the
479 reader's record, the writer's value for that field is ignored.
480 * if the reader's record schema has a field that contains a default value,
481 and writer's schema does not have a field with the same name, then the
482 reader should use the default value from its field.
483 * if the reader's record schema has a field with no default value, and
484 writer's schema does not have a field with the same name, then the
485 field's value is unset.
486 """
487 record = {}
488 if reader_schema is None:
489 for field in writer_schema["fields"]:
490 record[field["name"]] = read_data(
491 decoder,
492 field["type"],
493 named_schemas,
494 None,
495 options,
496 )
497 else:
498 readers_field_dict = {}
499 aliases_field_dict = {}
500 for f in reader_schema["fields"]:
501 readers_field_dict[f["name"]] = f
502 for alias in f.get("aliases", []):
503 aliases_field_dict[alias] = f
505 for field in writer_schema["fields"]:
506 readers_field = readers_field_dict.get(
507 field["name"],
508 aliases_field_dict.get(field["name"]),
509 )
510 if readers_field:
511 record[readers_field["name"]] = read_data(
512 decoder,
513 field["type"],
514 named_schemas,
515 readers_field["type"],
516 options,
517 )
518 else:
519 skip_data(decoder, field["type"], named_schemas)
521 # fill in default values
522 if len(readers_field_dict) > len(record):
523 writer_fields = [f["name"] for f in writer_schema["fields"]]
524 for f_name, field in readers_field_dict.items():
525 if f_name not in writer_fields and f_name not in record:
526 if "default" in field:
527 record[field["name"]] = field["default"]
528 else:
529 msg = f"No default value for field {field['name']} in {reader_schema['name']}"
530 raise SchemaResolutionError(msg)
532 return record
535def skip_record(decoder, writer_schema, named_schemas):
536 for field in writer_schema["fields"]:
537 skip_data(decoder, field["type"], named_schemas)
540READERS = {
541 "null": read_null,
542 "boolean": read_boolean,
543 "string": read_utf8,
544 "int": read_int,
545 "long": read_long,
546 "float": read_float,
547 "double": read_double,
548 "bytes": read_bytes,
549 "fixed": read_fixed,
550 "enum": read_enum,
551 "array": read_array,
552 "map": read_map,
553 "union": read_union,
554 "error_union": read_union,
555 "record": read_record,
556 "error": read_record,
557 "request": read_record,
558}
560SKIPS = {
561 "null": skip_null,
562 "boolean": skip_boolean,
563 "string": skip_utf8,
564 "int": skip_int,
565 "long": skip_long,
566 "float": skip_float,
567 "double": skip_double,
568 "bytes": skip_bytes,
569 "fixed": skip_fixed,
570 "enum": skip_enum,
571 "array": skip_array,
572 "map": skip_map,
573 "union": skip_union,
574 "error_union": skip_union,
575 "record": skip_record,
576 "error": skip_record,
577 "request": skip_record,
578}
581def maybe_promote(data, writer_type, reader_type):
582 if writer_type == "int":
583 # No need to promote to long since they are the same type in Python
584 if reader_type == "float" or reader_type == "double":
585 return float(data)
586 if writer_type == "long":
587 if reader_type == "float" or reader_type == "double":
588 return float(data)
589 if writer_type == "string" and reader_type == "bytes":
590 return data.encode()
591 if writer_type == "bytes" and reader_type == "string":
592 return data.decode()
593 return data
596def read_data(
597 decoder,
598 writer_schema,
599 named_schemas,
600 reader_schema=None,
601 options={},
602):
603 """Read data from file object according to schema."""
605 record_type = extract_record_type(writer_schema)
607 if reader_schema:
608 reader_schema = match_schemas(
609 writer_schema,
610 reader_schema,
611 named_schemas,
612 )
614 reader_fn = READERS.get(record_type)
615 if reader_fn:
616 try:
617 data = reader_fn(
618 decoder,
619 writer_schema,
620 named_schemas,
621 reader_schema,
622 options,
623 )
624 except StructError:
625 raise EOFError(f"cannot read {record_type} from {decoder.fo}")
627 if "logicalType" in writer_schema:
628 logical_type = extract_logical_type(writer_schema)
629 fn = LOGICAL_READERS.get(logical_type)
630 if fn:
631 return fn(data, writer_schema, reader_schema)
633 if reader_schema is not None:
634 return maybe_promote(data, record_type, extract_record_type(reader_schema))
635 else:
636 return data
637 else:
638 return read_data(
639 decoder,
640 named_schemas["writer"][record_type],
641 named_schemas,
642 named_schemas["reader"].get(reader_schema),
643 options,
644 )
647def skip_data(decoder, writer_schema, named_schemas):
648 record_type = extract_record_type(writer_schema)
650 reader_fn = SKIPS.get(record_type)
651 if reader_fn:
652 reader_fn(decoder, writer_schema, named_schemas)
653 else:
654 skip_data(decoder, named_schemas["writer"][record_type], named_schemas)
657def skip_sync(fo, sync_marker):
658 """Skip an expected sync marker, complaining if it doesn't match"""
659 if fo.read(SYNC_SIZE) != sync_marker:
660 raise ValueError("expected sync marker not found")
663def null_read_block(decoder):
664 """Read block in "null" codec."""
665 return BytesIO(decoder.read_bytes())
668def deflate_read_block(decoder):
669 """Read block in "deflate" codec."""
670 data = decoder.read_bytes()
671 # -15 is the log of the window size; negative indicates "raw" (no
672 # zlib headers) decompression. See zlib.h.
673 return BytesIO(zlib.decompressobj(-15).decompress(data))
676def bzip2_read_block(decoder):
677 """Read block in "bzip2" codec."""
678 data = decoder.read_bytes()
679 return BytesIO(bz2.decompress(data))
682def xz_read_block(decoder):
683 length = read_long(decoder)
684 data = decoder.read_fixed(length)
685 return BytesIO(lzma.decompress(data))
688BLOCK_READERS = {
689 "null": null_read_block,
690 "deflate": deflate_read_block,
691 "bzip2": bzip2_read_block,
692 "xz": xz_read_block,
693}
696def snappy_read_block(decoder):
697 length = read_long(decoder)
698 data = decoder.read_fixed(length - 4)
699 decoder.read_fixed(4) # CRC
700 return BytesIO(snappy_decompress(data))
703try:
704 from cramjam import snappy
706 snappy_decompress = snappy.decompress_raw
707except ImportError:
708 try:
709 import snappy
711 snappy_decompress = snappy.decompress
712 warn(
713 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed",
714 DeprecationWarning,
715 )
716 except ImportError:
717 BLOCK_READERS["snappy"] = missing_codec_lib("snappy", "cramjam")
718 else:
719 BLOCK_READERS["snappy"] = snappy_read_block
720else:
721 BLOCK_READERS["snappy"] = snappy_read_block
724def zstandard_read_block(decoder):
725 length = read_long(decoder)
726 data = decoder.read_fixed(length)
727 return BytesIO(zstandard.ZstdDecompressor().decompressobj().decompress(data))
730try:
731 import zstandard
732except ImportError:
733 BLOCK_READERS["zstandard"] = missing_codec_lib("zstandard", "zstandard")
734else:
735 BLOCK_READERS["zstandard"] = zstandard_read_block
738def lz4_read_block(decoder):
739 length = read_long(decoder)
740 data = decoder.read_fixed(length)
741 return BytesIO(lz4.block.decompress(data))
744try:
745 import lz4.block
746except ImportError:
747 BLOCK_READERS["lz4"] = missing_codec_lib("lz4", "lz4")
748else:
749 BLOCK_READERS["lz4"] = lz4_read_block
752def _iter_avro_records(
753 decoder,
754 header,
755 codec,
756 writer_schema,
757 named_schemas,
758 reader_schema,
759 options,
760):
761 """Return iterator over avro records."""
762 sync_marker = header["sync"]
764 read_block = BLOCK_READERS.get(codec)
765 if not read_block:
766 raise ValueError(f"Unrecognized codec: {codec}")
768 block_count = 0
769 while True:
770 try:
771 block_count = decoder.read_long()
772 except EOFError:
773 return
775 block_fo = read_block(decoder)
777 for i in range(block_count):
778 yield read_data(
779 BinaryDecoder(block_fo),
780 writer_schema,
781 named_schemas,
782 reader_schema,
783 options,
784 )
786 skip_sync(decoder.fo, sync_marker)
789def _iter_avro_blocks(
790 decoder,
791 header,
792 codec,
793 writer_schema,
794 named_schemas,
795 reader_schema,
796 options,
797):
798 """Return iterator over avro blocks."""
799 sync_marker = header["sync"]
801 read_block = BLOCK_READERS.get(codec)
802 if not read_block:
803 raise ValueError(f"Unrecognized codec: {codec}")
805 while True:
806 offset = decoder.fo.tell()
807 try:
808 num_block_records = decoder.read_long()
809 except EOFError:
810 return
812 block_bytes = read_block(decoder)
814 skip_sync(decoder.fo, sync_marker)
816 size = decoder.fo.tell() - offset
818 yield Block(
819 block_bytes,
820 num_block_records,
821 codec,
822 reader_schema,
823 writer_schema,
824 named_schemas,
825 offset,
826 size,
827 options,
828 )
831class Block:
832 """An avro block. Will yield records when iterated over
834 .. attribute:: num_records
836 Number of records in the block
838 .. attribute:: writer_schema
840 The schema used when writing
842 .. attribute:: reader_schema
844 The schema used when reading (if provided)
846 .. attribute:: offset
848 Offset of the block from the beginning of the avro file
850 .. attribute:: size
852 Size of the block in bytes
853 """
855 def __init__(
856 self,
857 bytes_,
858 num_records,
859 codec,
860 reader_schema,
861 writer_schema,
862 named_schemas,
863 offset,
864 size,
865 options,
866 ):
867 self.bytes_ = bytes_
868 self.num_records = num_records
869 self.codec = codec
870 self.reader_schema = reader_schema
871 self.writer_schema = writer_schema
872 self._named_schemas = named_schemas
873 self.offset = offset
874 self.size = size
875 self.options = options
877 def __iter__(self):
878 for i in range(self.num_records):
879 yield read_data(
880 BinaryDecoder(self.bytes_),
881 self.writer_schema,
882 self._named_schemas,
883 self.reader_schema,
884 self.options,
885 )
887 def __str__(self):
888 return (
889 f"Avro block: {len(self.bytes_)} bytes, "
890 + f"{self.num_records} records, "
891 + f"codec: {self.codec}, position {self.offset}+{self.size}"
892 )
895class file_reader(Generic[T]):
896 def __init__(
897 self,
898 fo_or_decoder,
899 reader_schema=None,
900 options={},
901 ):
902 if isinstance(fo_or_decoder, AvroJSONDecoder):
903 self.decoder = fo_or_decoder
904 else:
905 # If a decoder was not provided, assume binary
906 self.decoder = BinaryDecoder(fo_or_decoder)
908 self._named_schemas = _default_named_schemas()
909 if reader_schema:
910 self.reader_schema = parse_schema(
911 reader_schema, self._named_schemas["reader"], _write_hint=False
912 )
914 else:
915 self.reader_schema = None
916 self.options = options
917 self._elems = None
919 def _read_header(self):
920 try:
921 self._header = read_data(
922 self.decoder,
923 HEADER_SCHEMA,
924 self._named_schemas,
925 None,
926 self.options,
927 )
928 except EOFError:
929 raise ValueError("cannot read header - is it an avro file?")
931 # `meta` values are bytes. So, the actual decoding has to be external.
932 self.metadata = {k: v.decode() for k, v in self._header["meta"].items()}
934 self._schema = json.loads(self.metadata["avro.schema"])
935 self.codec = self.metadata.get("avro.codec", "null")
937 # Older avro files created before we were more strict about
938 # defaults might have been writen with a bad default. Since we re-parse
939 # the writer schema here, it will now fail. Therefore, if a user
940 # provides a reader schema that passes parsing, we will ignore those
941 # default errors
942 if self.reader_schema is not None:
943 ignore_default_error = True
944 else:
945 ignore_default_error = False
947 # Always parse the writer schema since it might have named types that
948 # need to be stored in self._named_types
949 self.writer_schema = parse_schema(
950 self._schema,
951 self._named_schemas["writer"],
952 _write_hint=False,
953 _force=True,
954 _ignore_default_error=ignore_default_error,
955 )
957 @property
958 def schema(self):
959 import warnings
961 warnings.warn(
962 "The 'schema' attribute is deprecated. Please use 'writer_schema'",
963 DeprecationWarning,
964 )
965 return self._schema
967 def __iter__(self) -> Iterator[T]:
968 if not self._elems:
969 raise NotImplementedError
970 return self._elems
972 def __next__(self) -> T:
973 return next(self._elems)
976class reader(file_reader[AvroMessage]):
977 """Iterator over records in an avro file.
979 Parameters
980 ----------
981 fo
982 File-like object to read from
983 reader_schema
984 Reader schema
985 return_record_name
986 If true, when reading a union of records, the result will be a tuple
987 where the first value is the name of the record and the second value is
988 the record itself
989 return_record_name_override
990 If true, this will modify the behavior of return_record_name so that
991 the record name is only returned for unions where there is more than
992 one record. For unions that only have one record, this option will make
993 it so that the record is returned by itself, not a tuple with the name.
994 return_named_type
995 If true, when reading a union of named types, the result will be a tuple
996 where the first value is the name of the type and the second value is
997 the record itself
998 NOTE: Using this option will ignore return_record_name and
999 return_record_name_override
1000 return_named_type_override
1001 If true, this will modify the behavior of return_named_type so that
1002 the named type is only returned for unions where there is more than
1003 one named type. For unions that only have one named type, this option
1004 will make it so that the named type is returned by itself, not a tuple
1005 with the name
1006 handle_unicode_errors
1007 Default `strict`. Should be set to a valid string that can be used in
1008 the errors argument of the string decode() function. Examples include
1009 `replace` and `ignore`
1012 Example::
1014 from fastavro import reader
1015 with open('some-file.avro', 'rb') as fo:
1016 avro_reader = reader(fo)
1017 for record in avro_reader:
1018 process_record(record)
1020 The `fo` argument is a file-like object so another common example usage
1021 would use an `io.BytesIO` object like so::
1023 from io import BytesIO
1024 from fastavro import writer, reader
1026 fo = BytesIO()
1027 writer(fo, schema, records)
1028 fo.seek(0)
1029 for record in reader(fo):
1030 process_record(record)
1032 .. attribute:: metadata
1034 Key-value pairs in the header metadata
1036 .. attribute:: codec
1038 The codec used when writing
1040 .. attribute:: writer_schema
1042 The schema used when writing
1044 .. attribute:: reader_schema
1046 The schema used when reading (if provided)
1047 """
1049 def __init__(
1050 self,
1051 fo: Union[IO, AvroJSONDecoder],
1052 reader_schema: Optional[Schema] = None,
1053 return_record_name: bool = False,
1054 return_record_name_override: bool = False,
1055 handle_unicode_errors: str = "strict",
1056 return_named_type: bool = False,
1057 return_named_type_override: bool = False,
1058 ):
1059 options = {
1060 "return_record_name": return_record_name,
1061 "return_record_name_override": return_record_name_override,
1062 "handle_unicode_errors": handle_unicode_errors,
1063 "return_named_type": return_named_type,
1064 "return_named_type_override": return_named_type_override,
1065 }
1066 super().__init__(fo, reader_schema, options)
1068 if isinstance(self.decoder, AvroJSONDecoder):
1069 self.decoder.configure(self.reader_schema, self._named_schemas["reader"])
1071 self.writer_schema = self.reader_schema
1072 self.reader_schema = None
1073 self._named_schemas["writer"] = self._named_schemas["reader"]
1074 self._named_schemas["reader"] = {}
1076 def _elems():
1077 while not self.decoder.done:
1078 yield read_data(
1079 self.decoder,
1080 self.writer_schema,
1081 self._named_schemas,
1082 self.reader_schema,
1083 self.options,
1084 )
1085 self.decoder.drain()
1087 self._elems = _elems()
1089 else:
1090 self._read_header()
1092 self._elems = _iter_avro_records(
1093 self.decoder,
1094 self._header,
1095 self.codec,
1096 self.writer_schema,
1097 self._named_schemas,
1098 self.reader_schema,
1099 self.options,
1100 )
1103class block_reader(file_reader[Block]):
1104 """Iterator over :class:`.Block` in an avro file.
1106 Parameters
1107 ----------
1108 fo
1109 Input stream
1110 reader_schema
1111 Reader schema
1112 return_record_name
1113 If true, when reading a union of records, the result will be a tuple
1114 where the first value is the name of the record and the second value is
1115 the record itself
1116 return_record_name_override
1117 If true, this will modify the behavior of return_record_name so that
1118 the record name is only returned for unions where there is more than
1119 one record. For unions that only have one record, this option will make
1120 it so that the record is returned by itself, not a tuple with the name.
1121 return_named_type
1122 If true, when reading a union of named types, the result will be a tuple
1123 where the first value is the name of the type and the second value is
1124 the record itself
1125 NOTE: Using this option will ignore return_record_name and
1126 return_record_name_override
1127 return_named_type_override
1128 If true, this will modify the behavior of return_named_type so that
1129 the named type is only returned for unions where there is more than
1130 one named type. For unions that only have one named type, this option
1131 will make it so that the named type is returned by itself, not a tuple
1132 with the name
1133 handle_unicode_errors
1134 Default `strict`. Should be set to a valid string that can be used in
1135 the errors argument of the string decode() function. Examples include
1136 `replace` and `ignore`
1139 Example::
1141 from fastavro import block_reader
1142 with open('some-file.avro', 'rb') as fo:
1143 avro_reader = block_reader(fo)
1144 for block in avro_reader:
1145 process_block(block)
1147 .. attribute:: metadata
1149 Key-value pairs in the header metadata
1151 .. attribute:: codec
1153 The codec used when writing
1155 .. attribute:: writer_schema
1157 The schema used when writing
1159 .. attribute:: reader_schema
1161 The schema used when reading (if provided)
1162 """
1164 def __init__(
1165 self,
1166 fo: IO,
1167 reader_schema: Optional[Schema] = None,
1168 return_record_name: bool = False,
1169 return_record_name_override: bool = False,
1170 handle_unicode_errors: str = "strict",
1171 return_named_type: bool = False,
1172 return_named_type_override: bool = False,
1173 ):
1174 options = {
1175 "return_record_name": return_record_name,
1176 "return_record_name_override": return_record_name_override,
1177 "handle_unicode_errors": handle_unicode_errors,
1178 "return_named_type": return_named_type,
1179 "return_named_type_override": return_named_type_override,
1180 }
1181 super().__init__(fo, reader_schema, options)
1183 self._read_header()
1185 self._elems = _iter_avro_blocks(
1186 self.decoder,
1187 self._header,
1188 self.codec,
1189 self.writer_schema,
1190 self._named_schemas,
1191 self.reader_schema,
1192 self.options,
1193 )
1196def schemaless_reader(
1197 fo: IO,
1198 writer_schema: Schema,
1199 reader_schema: Optional[Schema] = None,
1200 return_record_name: bool = False,
1201 return_record_name_override: bool = False,
1202 handle_unicode_errors: str = "strict",
1203 return_named_type: bool = False,
1204 return_named_type_override: bool = False,
1205) -> AvroMessage:
1206 """Reads a single record written using the
1207 :meth:`~fastavro._write_py.schemaless_writer`
1209 Parameters
1210 ----------
1211 fo
1212 Input stream
1213 writer_schema
1214 Schema used when calling schemaless_writer
1215 reader_schema
1216 If the schema has changed since being written then the new schema can
1217 be given to allow for schema migration
1218 return_record_name
1219 If true, when reading a union of records, the result will be a tuple
1220 where the first value is the name of the record and the second value is
1221 the record itself
1222 return_record_name_override
1223 If true, this will modify the behavior of return_record_name so that
1224 the record name is only returned for unions where there is more than
1225 one record. For unions that only have one record, this option will make
1226 it so that the record is returned by itself, not a tuple with the name.
1227 return_named_type
1228 If true, when reading a union of named types, the result will be a tuple
1229 where the first value is the name of the type and the second value is
1230 the record itself
1231 NOTE: Using this option will ignore return_record_name and
1232 return_record_name_override
1233 return_named_type_override
1234 If true, this will modify the behavior of return_named_type so that
1235 the named type is only returned for unions where there is more than
1236 one named type. For unions that only have one named type, this option
1237 will make it so that the named type is returned by itself, not a tuple
1238 with the name
1239 handle_unicode_errors
1240 Default `strict`. Should be set to a valid string that can be used in
1241 the errors argument of the string decode() function. Examples include
1242 `replace` and `ignore`
1245 Example::
1247 parsed_schema = fastavro.parse_schema(schema)
1248 with open('file', 'rb') as fp:
1249 record = fastavro.schemaless_reader(fp, parsed_schema)
1251 Note: The ``schemaless_reader`` can only read a single record.
1252 """
1253 if writer_schema == reader_schema:
1254 # No need for the reader schema if they are the same
1255 reader_schema = None
1257 named_schemas: Dict[str, NamedSchemas] = _default_named_schemas()
1258 writer_schema = parse_schema(writer_schema, named_schemas["writer"])
1260 if reader_schema:
1261 reader_schema = parse_schema(reader_schema, named_schemas["reader"])
1263 decoder = BinaryDecoder(fo)
1265 options = {
1266 "return_record_name": return_record_name,
1267 "return_record_name_override": return_record_name_override,
1268 "handle_unicode_errors": handle_unicode_errors,
1269 "return_named_type": return_named_type,
1270 "return_named_type_override": return_named_type_override,
1271 }
1273 return read_data(
1274 decoder,
1275 writer_schema,
1276 named_schemas,
1277 reader_schema,
1278 options,
1279 )
1282def is_avro(path_or_buffer: Union[str, IO]) -> bool:
1283 """Return True if path (or buffer) points to an Avro file. This will only
1284 work for avro files that contain the normal avro schema header like those
1285 create from :func:`~fastavro._write_py.writer`. This function is not intended
1286 to be used with binary data created from
1287 :func:`~fastavro._write_py.schemaless_writer` since that does not include the
1288 avro header.
1290 Parameters
1291 ----------
1292 path_or_buffer
1293 Path to file
1294 """
1295 fp: IO
1296 if isinstance(path_or_buffer, str):
1297 fp = open(path_or_buffer, "rb")
1298 close = True
1299 else:
1300 fp = path_or_buffer
1301 close = False
1303 try:
1304 header = fp.read(len(MAGIC))
1305 return header == MAGIC
1306 finally:
1307 if close:
1308 fp.close()