1"""Python code for reading AVRO files"""
2
3# This code is a modified version of the code at
4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under
5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0)
6
7import bz2
8import json
9import lzma
10import zlib
11from datetime import datetime, timezone
12from decimal import Context
13from io import BytesIO
14from struct import error as StructError
15from typing import IO, Union, Optional, Generic, TypeVar, Iterator, Dict
16from warnings import warn
17
18from .io.binary_decoder import BinaryDecoder
19from .io.json_decoder import AvroJSONDecoder
20from .logical_readers import LOGICAL_READERS
21from .schema import (
22 extract_record_type,
23 is_single_record_union,
24 is_single_name_union,
25 extract_logical_type,
26 parse_schema,
27)
28from .types import Schema, AvroMessage, NamedSchemas
29from ._read_common import (
30 SchemaResolutionError,
31 MAGIC,
32 SYNC_SIZE,
33 HEADER_SCHEMA,
34 missing_codec_lib,
35)
36from .const import NAMED_TYPES, AVRO_TYPES
37
38T = TypeVar("T")
39
40decimal_context = Context()
41epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
42epoch_naive = datetime(1970, 1, 1)
43
44
45def _default_named_schemas() -> Dict[str, NamedSchemas]:
46 return {"writer": {}, "reader": {}}
47
48
49def match_types(writer_type, reader_type, named_schemas):
50 if isinstance(writer_type, list) or isinstance(reader_type, list):
51 return True
52 if isinstance(writer_type, dict) or isinstance(reader_type, dict):
53 try:
54 return match_schemas(writer_type, reader_type, named_schemas)
55 except SchemaResolutionError:
56 return False
57 if writer_type == reader_type:
58 return True
59 # promotion cases
60 elif writer_type == "int" and reader_type in ["long", "float", "double"]:
61 return True
62 elif writer_type == "long" and reader_type in ["float", "double"]:
63 return True
64 elif writer_type == "float" and reader_type == "double":
65 return True
66 elif writer_type == "string" and reader_type == "bytes":
67 return True
68 elif writer_type == "bytes" and reader_type == "string":
69 return True
70 writer_schema = named_schemas["writer"].get(writer_type)
71 reader_schema = named_schemas["reader"].get(reader_type)
72 if writer_schema is not None and reader_schema is not None:
73 return match_types(writer_schema, reader_schema, named_schemas)
74 return False
75
76
77def match_schemas(w_schema, r_schema, named_schemas):
78 error_msg = f"Schema mismatch: {w_schema} is not {r_schema}"
79 if isinstance(w_schema, list):
80 # If the writer is a union, checks will happen in read_union after the
81 # correct schema is known
82 return r_schema
83 elif isinstance(r_schema, list):
84 # If the reader is a union, ensure one of the new schemas is the same
85 # as the writer
86 for schema in r_schema:
87 if match_types(w_schema, schema, named_schemas):
88 return schema
89 else:
90 raise SchemaResolutionError(error_msg)
91 else:
92 # Check for dicts as primitive types are just strings
93 if isinstance(w_schema, dict):
94 w_type = w_schema["type"]
95 else:
96 w_type = w_schema
97 if isinstance(r_schema, dict):
98 r_type = r_schema["type"]
99 else:
100 r_type = r_schema
101
102 if w_type == r_type == "map":
103 if match_types(w_schema["values"], r_schema["values"], named_schemas):
104 return r_schema
105 elif w_type == r_type == "array":
106 if match_types(w_schema["items"], r_schema["items"], named_schemas):
107 return r_schema
108 elif w_type in NAMED_TYPES and r_type in NAMED_TYPES:
109 if w_type == r_type == "fixed" and w_schema["size"] != r_schema["size"]:
110 raise SchemaResolutionError(
111 f"Schema mismatch: {w_schema} size is different than {r_schema} size"
112 )
113
114 w_unqual_name = w_schema["name"].split(".")[-1]
115 r_unqual_name = r_schema["name"].split(".")[-1]
116 r_aliases = r_schema.get("aliases", [])
117 if (
118 w_unqual_name == r_unqual_name
119 or w_schema["name"] in r_aliases
120 or w_unqual_name in r_aliases
121 ):
122 return r_schema
123 elif w_type not in AVRO_TYPES and r_type in NAMED_TYPES:
124 if match_types(w_type, r_schema["name"], named_schemas):
125 return r_schema["name"]
126 elif match_types(w_type, r_type, named_schemas):
127 return r_schema
128 raise SchemaResolutionError(error_msg)
129
130
131def read_null(
132 decoder,
133 writer_schema=None,
134 named_schemas=None,
135 reader_schema=None,
136 options={},
137):
138 return decoder.read_null()
139
140
141def skip_null(decoder, writer_schema=None, named_schemas=None):
142 decoder.read_null()
143
144
145def read_boolean(
146 decoder,
147 writer_schema=None,
148 named_schemas=None,
149 reader_schema=None,
150 options={},
151):
152 return decoder.read_boolean()
153
154
155def skip_boolean(decoder, writer_schema=None, named_schemas=None):
156 decoder.read_boolean()
157
158
159def read_int(
160 decoder,
161 writer_schema=None,
162 named_schemas=None,
163 reader_schema=None,
164 options={},
165):
166 return decoder.read_int()
167
168
169def skip_int(decoder, writer_schema=None, named_schemas=None):
170 decoder.read_int()
171
172
173def read_long(
174 decoder,
175 writer_schema=None,
176 named_schemas=None,
177 reader_schema=None,
178 options={},
179):
180 return decoder.read_long()
181
182
183def skip_long(decoder, writer_schema=None, named_schemas=None):
184 decoder.read_long()
185
186
187def read_float(
188 decoder,
189 writer_schema=None,
190 named_schemas=None,
191 reader_schema=None,
192 options={},
193):
194 return decoder.read_float()
195
196
197def skip_float(decoder, writer_schema=None, named_schemas=None):
198 decoder.read_float()
199
200
201def read_double(
202 decoder,
203 writer_schema=None,
204 named_schemas=None,
205 reader_schema=None,
206 options={},
207):
208 return decoder.read_double()
209
210
211def skip_double(decoder, writer_schema=None, named_schemas=None):
212 decoder.read_double()
213
214
215def read_bytes(
216 decoder,
217 writer_schema=None,
218 named_schemas=None,
219 reader_schema=None,
220 options={},
221):
222 return decoder.read_bytes()
223
224
225def skip_bytes(decoder, writer_schema=None, named_schemas=None):
226 decoder.read_bytes()
227
228
229def read_utf8(
230 decoder,
231 writer_schema=None,
232 named_schemas=None,
233 reader_schema=None,
234 options={},
235):
236 return decoder.read_utf8(
237 handle_unicode_errors=options.get("handle_unicode_errors", "strict")
238 )
239
240
241def skip_utf8(decoder, writer_schema=None, named_schemas=None):
242 decoder.read_utf8()
243
244
245def read_fixed(
246 decoder,
247 writer_schema,
248 named_schemas=None,
249 reader_schema=None,
250 options={},
251):
252 size = writer_schema["size"]
253 return decoder.read_fixed(size)
254
255
256def skip_fixed(decoder, writer_schema, named_schemas=None):
257 size = writer_schema["size"]
258 decoder.read_fixed(size)
259
260
261def read_enum(
262 decoder,
263 writer_schema,
264 named_schemas,
265 reader_schema=None,
266 options={},
267):
268 symbol = writer_schema["symbols"][decoder.read_enum()]
269 if reader_schema and symbol not in reader_schema["symbols"]:
270 default = reader_schema.get("default")
271 if default:
272 return default
273 else:
274 symlist = reader_schema["symbols"]
275 msg = f"{symbol} not found in reader symbol list {reader_schema['name']}, known symbols: {symlist}"
276 raise SchemaResolutionError(msg)
277 return symbol
278
279
280def skip_enum(decoder, writer_schema, named_schemas):
281 decoder.read_enum()
282
283
284def read_array(
285 decoder,
286 writer_schema,
287 named_schemas,
288 reader_schema=None,
289 options={},
290):
291 if reader_schema:
292
293 def item_reader(decoder, w_schema, r_schema, options):
294 return read_data(
295 decoder,
296 w_schema["items"],
297 named_schemas,
298 r_schema["items"],
299 options,
300 )
301
302 else:
303
304 def item_reader(decoder, w_schema, r_schema, options):
305 return read_data(
306 decoder,
307 w_schema["items"],
308 named_schemas,
309 None,
310 options,
311 )
312
313 read_items = []
314
315 decoder.read_array_start()
316
317 for item in decoder.iter_array():
318 read_items.append(
319 item_reader(
320 decoder,
321 writer_schema,
322 reader_schema,
323 options,
324 )
325 )
326
327 decoder.read_array_end()
328
329 return read_items
330
331
332def skip_array(decoder, writer_schema, named_schemas):
333 decoder.read_array_start()
334
335 for item in decoder.iter_array():
336 skip_data(decoder, writer_schema["items"], named_schemas)
337
338 decoder.read_array_end()
339
340
341def read_map(
342 decoder,
343 writer_schema,
344 named_schemas,
345 reader_schema=None,
346 options={},
347):
348 if reader_schema:
349
350 def item_reader(decoder, w_schema, r_schema):
351 return read_data(
352 decoder,
353 w_schema["values"],
354 named_schemas,
355 r_schema["values"],
356 options,
357 )
358
359 else:
360
361 def item_reader(decoder, w_schema, r_schema):
362 return read_data(
363 decoder,
364 w_schema["values"],
365 named_schemas,
366 None,
367 options,
368 )
369
370 read_items = {}
371
372 decoder.read_map_start()
373
374 for item in decoder.iter_map():
375 key = decoder.read_utf8()
376 read_items[key] = item_reader(decoder, writer_schema, reader_schema)
377
378 decoder.read_map_end()
379
380 return read_items
381
382
383def skip_map(decoder, writer_schema, named_schemas):
384 decoder.read_map_start()
385
386 for item in decoder.iter_map():
387 decoder.read_utf8()
388 skip_data(decoder, writer_schema["values"], named_schemas)
389
390 decoder.read_map_end()
391
392
393def read_union(
394 decoder,
395 writer_schema,
396 named_schemas,
397 reader_schema=None,
398 options={},
399):
400 # schema resolution
401 index = decoder.read_index()
402 idx_schema = writer_schema[index]
403
404 if reader_schema:
405 msg = f"schema mismatch: {writer_schema} not found in {reader_schema}"
406 # Handle case where the reader schema is just a single type (not union)
407 if not isinstance(reader_schema, list):
408 if match_types(idx_schema, reader_schema, named_schemas):
409 result = read_data(
410 decoder,
411 idx_schema,
412 named_schemas,
413 reader_schema,
414 options,
415 )
416 else:
417 raise SchemaResolutionError(msg)
418 else:
419 for schema in reader_schema:
420 if match_types(idx_schema, schema, named_schemas):
421 result = read_data(
422 decoder,
423 idx_schema,
424 named_schemas,
425 schema,
426 options,
427 )
428 break
429 else:
430 raise SchemaResolutionError(msg)
431 else:
432 result = read_data(decoder, idx_schema, named_schemas, None, options)
433
434 return_record_name_override = options.get("return_record_name_override")
435 return_record_name = options.get("return_record_name")
436 return_named_type_override = options.get("return_named_type_override")
437 return_named_type = options.get("return_named_type")
438 if return_named_type_override and is_single_name_union(writer_schema):
439 return result
440 elif return_named_type and extract_record_type(idx_schema) in NAMED_TYPES:
441 return (idx_schema["name"], result)
442 elif return_named_type and extract_record_type(idx_schema) not in AVRO_TYPES:
443 # idx_schema is a named type
444 return (named_schemas["writer"][idx_schema]["name"], result)
445 elif return_record_name_override and is_single_record_union(writer_schema):
446 return result
447 elif return_record_name and extract_record_type(idx_schema) == "record":
448 return (idx_schema["name"], result)
449 elif return_record_name and extract_record_type(idx_schema) not in AVRO_TYPES:
450 # idx_schema is a named type
451 return (named_schemas["writer"][idx_schema]["name"], result)
452 else:
453 return result
454
455
456def skip_union(decoder, writer_schema, named_schemas):
457 # schema resolution
458 index = decoder.read_index()
459 skip_data(decoder, writer_schema[index], named_schemas)
460
461
462def read_record(
463 decoder,
464 writer_schema,
465 named_schemas,
466 reader_schema=None,
467 options={},
468):
469 """A record is encoded by encoding the values of its fields in the order
470 that they are declared. In other words, a record is encoded as just the
471 concatenation of the encodings of its fields. Field values are encoded per
472 their schema.
473
474 Schema Resolution:
475 * the ordering of fields may be different: fields are matched by name.
476 * schemas for fields with the same name in both records are resolved
477 recursively.
478 * if the writer's record contains a field with a name not present in the
479 reader's record, the writer's value for that field is ignored.
480 * if the reader's record schema has a field that contains a default value,
481 and writer's schema does not have a field with the same name, then the
482 reader should use the default value from its field.
483 * if the reader's record schema has a field with no default value, and
484 writer's schema does not have a field with the same name, then the
485 field's value is unset.
486 """
487 record = {}
488 if reader_schema is None:
489 for field in writer_schema["fields"]:
490 record[field["name"]] = read_data(
491 decoder,
492 field["type"],
493 named_schemas,
494 None,
495 options,
496 )
497 else:
498 readers_field_dict = {}
499 aliases_field_dict = {}
500 for f in reader_schema["fields"]:
501 readers_field_dict[f["name"]] = f
502 for alias in f.get("aliases", []):
503 aliases_field_dict[alias] = f
504
505 for field in writer_schema["fields"]:
506 readers_field = readers_field_dict.get(
507 field["name"],
508 aliases_field_dict.get(field["name"]),
509 )
510 if readers_field:
511 record[readers_field["name"]] = read_data(
512 decoder,
513 field["type"],
514 named_schemas,
515 readers_field["type"],
516 options,
517 )
518 else:
519 skip_data(decoder, field["type"], named_schemas)
520
521 # fill in default values
522 if len(readers_field_dict) > len(record):
523 writer_fields = [f["name"] for f in writer_schema["fields"]]
524 for f_name, field in readers_field_dict.items():
525 if f_name not in writer_fields and f_name not in record:
526 if "default" in field:
527 record[field["name"]] = field["default"]
528 else:
529 msg = f"No default value for field {field['name']} in {reader_schema['name']}"
530 raise SchemaResolutionError(msg)
531
532 return record
533
534
535def skip_record(decoder, writer_schema, named_schemas):
536 for field in writer_schema["fields"]:
537 skip_data(decoder, field["type"], named_schemas)
538
539
540READERS = {
541 "null": read_null,
542 "boolean": read_boolean,
543 "string": read_utf8,
544 "int": read_int,
545 "long": read_long,
546 "float": read_float,
547 "double": read_double,
548 "bytes": read_bytes,
549 "fixed": read_fixed,
550 "enum": read_enum,
551 "array": read_array,
552 "map": read_map,
553 "union": read_union,
554 "error_union": read_union,
555 "record": read_record,
556 "error": read_record,
557 "request": read_record,
558}
559
560SKIPS = {
561 "null": skip_null,
562 "boolean": skip_boolean,
563 "string": skip_utf8,
564 "int": skip_int,
565 "long": skip_long,
566 "float": skip_float,
567 "double": skip_double,
568 "bytes": skip_bytes,
569 "fixed": skip_fixed,
570 "enum": skip_enum,
571 "array": skip_array,
572 "map": skip_map,
573 "union": skip_union,
574 "error_union": skip_union,
575 "record": skip_record,
576 "error": skip_record,
577 "request": skip_record,
578}
579
580
581def maybe_promote(data, writer_type, reader_type):
582 if writer_type == "int":
583 # No need to promote to long since they are the same type in Python
584 if reader_type == "float" or reader_type == "double":
585 return float(data)
586 if writer_type == "long":
587 if reader_type == "float" or reader_type == "double":
588 return float(data)
589 if writer_type == "string" and reader_type == "bytes":
590 return data.encode()
591 if writer_type == "bytes" and reader_type == "string":
592 return data.decode()
593 return data
594
595
596def read_data(
597 decoder,
598 writer_schema,
599 named_schemas,
600 reader_schema=None,
601 options={},
602):
603 """Read data from file object according to schema."""
604
605 record_type = extract_record_type(writer_schema)
606
607 if reader_schema:
608 reader_schema = match_schemas(
609 writer_schema,
610 reader_schema,
611 named_schemas,
612 )
613
614 reader_fn = READERS.get(record_type)
615 if reader_fn:
616 try:
617 data = reader_fn(
618 decoder,
619 writer_schema,
620 named_schemas,
621 reader_schema,
622 options,
623 )
624 except StructError:
625 raise EOFError(f"cannot read {record_type} from {decoder.fo}")
626
627 if "logicalType" in writer_schema:
628 logical_type = extract_logical_type(writer_schema)
629 fn = LOGICAL_READERS.get(logical_type)
630 if fn:
631 return fn(data, writer_schema, reader_schema)
632
633 if reader_schema is not None:
634 return maybe_promote(data, record_type, extract_record_type(reader_schema))
635 else:
636 return data
637 else:
638 return read_data(
639 decoder,
640 named_schemas["writer"][record_type],
641 named_schemas,
642 named_schemas["reader"].get(reader_schema),
643 options,
644 )
645
646
647def skip_data(decoder, writer_schema, named_schemas):
648 record_type = extract_record_type(writer_schema)
649
650 reader_fn = SKIPS.get(record_type)
651 if reader_fn:
652 reader_fn(decoder, writer_schema, named_schemas)
653 else:
654 skip_data(decoder, named_schemas["writer"][record_type], named_schemas)
655
656
657def skip_sync(fo, sync_marker):
658 """Skip an expected sync marker, complaining if it doesn't match"""
659 if fo.read(SYNC_SIZE) != sync_marker:
660 raise ValueError("expected sync marker not found")
661
662
663def null_read_block(decoder):
664 """Read block in "null" codec."""
665 return BytesIO(decoder.read_bytes())
666
667
668def deflate_read_block(decoder):
669 """Read block in "deflate" codec."""
670 data = decoder.read_bytes()
671 # -15 is the log of the window size; negative indicates "raw" (no
672 # zlib headers) decompression. See zlib.h.
673 return BytesIO(zlib.decompressobj(-15).decompress(data))
674
675
676def bzip2_read_block(decoder):
677 """Read block in "bzip2" codec."""
678 data = decoder.read_bytes()
679 return BytesIO(bz2.decompress(data))
680
681
682def xz_read_block(decoder):
683 length = read_long(decoder)
684 data = decoder.read_fixed(length)
685 return BytesIO(lzma.decompress(data))
686
687
688BLOCK_READERS = {
689 "null": null_read_block,
690 "deflate": deflate_read_block,
691 "bzip2": bzip2_read_block,
692 "xz": xz_read_block,
693}
694
695
696def snappy_read_block(decoder):
697 length = read_long(decoder)
698 data = decoder.read_fixed(length - 4)
699 decoder.read_fixed(4) # CRC
700 return BytesIO(snappy_decompress(data))
701
702
703try:
704 from cramjam import snappy
705
706 snappy_decompress = snappy.decompress_raw
707except ImportError:
708 try:
709 import snappy
710
711 snappy_decompress = snappy.decompress
712 warn(
713 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed",
714 DeprecationWarning,
715 )
716 except ImportError:
717 BLOCK_READERS["snappy"] = missing_codec_lib("snappy", "cramjam")
718 else:
719 BLOCK_READERS["snappy"] = snappy_read_block
720else:
721 BLOCK_READERS["snappy"] = snappy_read_block
722
723
724def zstandard_read_block(decoder):
725 length = read_long(decoder)
726 data = decoder.read_fixed(length)
727 return BytesIO(zstandard.ZstdDecompressor().decompressobj().decompress(data))
728
729
730try:
731 import zstandard
732except ImportError:
733 BLOCK_READERS["zstandard"] = missing_codec_lib("zstandard", "zstandard")
734else:
735 BLOCK_READERS["zstandard"] = zstandard_read_block
736
737
738def lz4_read_block(decoder):
739 length = read_long(decoder)
740 data = decoder.read_fixed(length)
741 return BytesIO(lz4.block.decompress(data))
742
743
744try:
745 import lz4.block
746except ImportError:
747 BLOCK_READERS["lz4"] = missing_codec_lib("lz4", "lz4")
748else:
749 BLOCK_READERS["lz4"] = lz4_read_block
750
751
752def _iter_avro_records(
753 decoder,
754 header,
755 codec,
756 writer_schema,
757 named_schemas,
758 reader_schema,
759 options,
760):
761 """Return iterator over avro records."""
762 sync_marker = header["sync"]
763
764 read_block = BLOCK_READERS.get(codec)
765 if not read_block:
766 raise ValueError(f"Unrecognized codec: {codec}")
767
768 block_count = 0
769 while True:
770 try:
771 block_count = decoder.read_long()
772 except EOFError:
773 return
774
775 block_fo = read_block(decoder)
776
777 for i in range(block_count):
778 yield read_data(
779 BinaryDecoder(block_fo),
780 writer_schema,
781 named_schemas,
782 reader_schema,
783 options,
784 )
785
786 skip_sync(decoder.fo, sync_marker)
787
788
789def _iter_avro_blocks(
790 decoder,
791 header,
792 codec,
793 writer_schema,
794 named_schemas,
795 reader_schema,
796 options,
797):
798 """Return iterator over avro blocks."""
799 sync_marker = header["sync"]
800
801 read_block = BLOCK_READERS.get(codec)
802 if not read_block:
803 raise ValueError(f"Unrecognized codec: {codec}")
804
805 while True:
806 offset = decoder.fo.tell()
807 try:
808 num_block_records = decoder.read_long()
809 except EOFError:
810 return
811
812 block_bytes = read_block(decoder)
813
814 skip_sync(decoder.fo, sync_marker)
815
816 size = decoder.fo.tell() - offset
817
818 yield Block(
819 block_bytes,
820 num_block_records,
821 codec,
822 reader_schema,
823 writer_schema,
824 named_schemas,
825 offset,
826 size,
827 options,
828 )
829
830
831class Block:
832 """An avro block. Will yield records when iterated over
833
834 .. attribute:: num_records
835
836 Number of records in the block
837
838 .. attribute:: writer_schema
839
840 The schema used when writing
841
842 .. attribute:: reader_schema
843
844 The schema used when reading (if provided)
845
846 .. attribute:: offset
847
848 Offset of the block from the beginning of the avro file
849
850 .. attribute:: size
851
852 Size of the block in bytes
853 """
854
855 def __init__(
856 self,
857 bytes_,
858 num_records,
859 codec,
860 reader_schema,
861 writer_schema,
862 named_schemas,
863 offset,
864 size,
865 options,
866 ):
867 self.bytes_ = bytes_
868 self.num_records = num_records
869 self.codec = codec
870 self.reader_schema = reader_schema
871 self.writer_schema = writer_schema
872 self._named_schemas = named_schemas
873 self.offset = offset
874 self.size = size
875 self.options = options
876
877 def __iter__(self):
878 for i in range(self.num_records):
879 yield read_data(
880 BinaryDecoder(self.bytes_),
881 self.writer_schema,
882 self._named_schemas,
883 self.reader_schema,
884 self.options,
885 )
886
887 def __str__(self):
888 return (
889 f"Avro block: {len(self.bytes_)} bytes, "
890 + f"{self.num_records} records, "
891 + f"codec: {self.codec}, position {self.offset}+{self.size}"
892 )
893
894
895class file_reader(Generic[T]):
896 def __init__(
897 self,
898 fo_or_decoder,
899 reader_schema=None,
900 options={},
901 ):
902 if isinstance(fo_or_decoder, AvroJSONDecoder):
903 self.decoder = fo_or_decoder
904 else:
905 # If a decoder was not provided, assume binary
906 self.decoder = BinaryDecoder(fo_or_decoder)
907
908 self._named_schemas = _default_named_schemas()
909 if reader_schema:
910 self.reader_schema = parse_schema(
911 reader_schema, self._named_schemas["reader"], _write_hint=False
912 )
913
914 else:
915 self.reader_schema = None
916 self.options = options
917 self._elems = None
918
919 def _read_header(self):
920 try:
921 self._header = read_data(
922 self.decoder,
923 HEADER_SCHEMA,
924 self._named_schemas,
925 None,
926 self.options,
927 )
928 except EOFError:
929 raise ValueError("cannot read header - is it an avro file?")
930
931 # `meta` values are bytes. So, the actual decoding has to be external.
932 self.metadata = {k: v.decode() for k, v in self._header["meta"].items()}
933
934 self._schema = json.loads(self.metadata["avro.schema"])
935 self.codec = self.metadata.get("avro.codec", "null")
936
937 # Older avro files created before we were more strict about
938 # defaults might have been writen with a bad default. Since we re-parse
939 # the writer schema here, it will now fail. Therefore, if a user
940 # provides a reader schema that passes parsing, we will ignore those
941 # default errors
942 if self.reader_schema is not None:
943 ignore_default_error = True
944 else:
945 ignore_default_error = False
946
947 # Always parse the writer schema since it might have named types that
948 # need to be stored in self._named_types
949 self.writer_schema = parse_schema(
950 self._schema,
951 self._named_schemas["writer"],
952 _write_hint=False,
953 _force=True,
954 _ignore_default_error=ignore_default_error,
955 )
956
957 @property
958 def schema(self):
959 import warnings
960
961 warnings.warn(
962 "The 'schema' attribute is deprecated. Please use 'writer_schema'",
963 DeprecationWarning,
964 )
965 return self._schema
966
967 def __iter__(self) -> Iterator[T]:
968 if not self._elems:
969 raise NotImplementedError
970 return self._elems
971
972 def __next__(self) -> T:
973 return next(self._elems)
974
975
976class reader(file_reader[AvroMessage]):
977 """Iterator over records in an avro file.
978
979 Parameters
980 ----------
981 fo
982 File-like object to read from
983 reader_schema
984 Reader schema
985 return_record_name
986 If true, when reading a union of records, the result will be a tuple
987 where the first value is the name of the record and the second value is
988 the record itself
989 return_record_name_override
990 If true, this will modify the behavior of return_record_name so that
991 the record name is only returned for unions where there is more than
992 one record. For unions that only have one record, this option will make
993 it so that the record is returned by itself, not a tuple with the name.
994 return_named_type
995 If true, when reading a union of named types, the result will be a tuple
996 where the first value is the name of the type and the second value is
997 the record itself
998 NOTE: Using this option will ignore return_record_name and
999 return_record_name_override
1000 return_named_type_override
1001 If true, this will modify the behavior of return_named_type so that
1002 the named type is only returned for unions where there is more than
1003 one named type. For unions that only have one named type, this option
1004 will make it so that the named type is returned by itself, not a tuple
1005 with the name
1006 handle_unicode_errors
1007 Default `strict`. Should be set to a valid string that can be used in
1008 the errors argument of the string decode() function. Examples include
1009 `replace` and `ignore`
1010
1011
1012 Example::
1013
1014 from fastavro import reader
1015 with open('some-file.avro', 'rb') as fo:
1016 avro_reader = reader(fo)
1017 for record in avro_reader:
1018 process_record(record)
1019
1020 The `fo` argument is a file-like object so another common example usage
1021 would use an `io.BytesIO` object like so::
1022
1023 from io import BytesIO
1024 from fastavro import writer, reader
1025
1026 fo = BytesIO()
1027 writer(fo, schema, records)
1028 fo.seek(0)
1029 for record in reader(fo):
1030 process_record(record)
1031
1032 .. attribute:: metadata
1033
1034 Key-value pairs in the header metadata
1035
1036 .. attribute:: codec
1037
1038 The codec used when writing
1039
1040 .. attribute:: writer_schema
1041
1042 The schema used when writing
1043
1044 .. attribute:: reader_schema
1045
1046 The schema used when reading (if provided)
1047 """
1048
1049 def __init__(
1050 self,
1051 fo: Union[IO, AvroJSONDecoder],
1052 reader_schema: Optional[Schema] = None,
1053 return_record_name: bool = False,
1054 return_record_name_override: bool = False,
1055 handle_unicode_errors: str = "strict",
1056 return_named_type: bool = False,
1057 return_named_type_override: bool = False,
1058 ):
1059 options = {
1060 "return_record_name": return_record_name,
1061 "return_record_name_override": return_record_name_override,
1062 "handle_unicode_errors": handle_unicode_errors,
1063 "return_named_type": return_named_type,
1064 "return_named_type_override": return_named_type_override,
1065 }
1066 super().__init__(fo, reader_schema, options)
1067
1068 if isinstance(self.decoder, AvroJSONDecoder):
1069 self.decoder.configure(self.reader_schema, self._named_schemas["reader"])
1070
1071 self.writer_schema = self.reader_schema
1072 self.reader_schema = None
1073 self._named_schemas["writer"] = self._named_schemas["reader"]
1074 self._named_schemas["reader"] = {}
1075
1076 def _elems():
1077 while not self.decoder.done:
1078 yield read_data(
1079 self.decoder,
1080 self.writer_schema,
1081 self._named_schemas,
1082 self.reader_schema,
1083 self.options,
1084 )
1085 self.decoder.drain()
1086
1087 self._elems = _elems()
1088
1089 else:
1090 self._read_header()
1091
1092 self._elems = _iter_avro_records(
1093 self.decoder,
1094 self._header,
1095 self.codec,
1096 self.writer_schema,
1097 self._named_schemas,
1098 self.reader_schema,
1099 self.options,
1100 )
1101
1102
1103class block_reader(file_reader[Block]):
1104 """Iterator over :class:`.Block` in an avro file.
1105
1106 Parameters
1107 ----------
1108 fo
1109 Input stream
1110 reader_schema
1111 Reader schema
1112 return_record_name
1113 If true, when reading a union of records, the result will be a tuple
1114 where the first value is the name of the record and the second value is
1115 the record itself
1116 return_record_name_override
1117 If true, this will modify the behavior of return_record_name so that
1118 the record name is only returned for unions where there is more than
1119 one record. For unions that only have one record, this option will make
1120 it so that the record is returned by itself, not a tuple with the name.
1121 return_named_type
1122 If true, when reading a union of named types, the result will be a tuple
1123 where the first value is the name of the type and the second value is
1124 the record itself
1125 NOTE: Using this option will ignore return_record_name and
1126 return_record_name_override
1127 return_named_type_override
1128 If true, this will modify the behavior of return_named_type so that
1129 the named type is only returned for unions where there is more than
1130 one named type. For unions that only have one named type, this option
1131 will make it so that the named type is returned by itself, not a tuple
1132 with the name
1133 handle_unicode_errors
1134 Default `strict`. Should be set to a valid string that can be used in
1135 the errors argument of the string decode() function. Examples include
1136 `replace` and `ignore`
1137
1138
1139 Example::
1140
1141 from fastavro import block_reader
1142 with open('some-file.avro', 'rb') as fo:
1143 avro_reader = block_reader(fo)
1144 for block in avro_reader:
1145 process_block(block)
1146
1147 .. attribute:: metadata
1148
1149 Key-value pairs in the header metadata
1150
1151 .. attribute:: codec
1152
1153 The codec used when writing
1154
1155 .. attribute:: writer_schema
1156
1157 The schema used when writing
1158
1159 .. attribute:: reader_schema
1160
1161 The schema used when reading (if provided)
1162 """
1163
1164 def __init__(
1165 self,
1166 fo: IO,
1167 reader_schema: Optional[Schema] = None,
1168 return_record_name: bool = False,
1169 return_record_name_override: bool = False,
1170 handle_unicode_errors: str = "strict",
1171 return_named_type: bool = False,
1172 return_named_type_override: bool = False,
1173 ):
1174 options = {
1175 "return_record_name": return_record_name,
1176 "return_record_name_override": return_record_name_override,
1177 "handle_unicode_errors": handle_unicode_errors,
1178 "return_named_type": return_named_type,
1179 "return_named_type_override": return_named_type_override,
1180 }
1181 super().__init__(fo, reader_schema, options)
1182
1183 self._read_header()
1184
1185 self._elems = _iter_avro_blocks(
1186 self.decoder,
1187 self._header,
1188 self.codec,
1189 self.writer_schema,
1190 self._named_schemas,
1191 self.reader_schema,
1192 self.options,
1193 )
1194
1195
1196def schemaless_reader(
1197 fo: IO,
1198 writer_schema: Schema,
1199 reader_schema: Optional[Schema] = None,
1200 return_record_name: bool = False,
1201 return_record_name_override: bool = False,
1202 handle_unicode_errors: str = "strict",
1203 return_named_type: bool = False,
1204 return_named_type_override: bool = False,
1205) -> AvroMessage:
1206 """Reads a single record written using the
1207 :meth:`~fastavro._write_py.schemaless_writer`
1208
1209 Parameters
1210 ----------
1211 fo
1212 Input stream
1213 writer_schema
1214 Schema used when calling schemaless_writer
1215 reader_schema
1216 If the schema has changed since being written then the new schema can
1217 be given to allow for schema migration
1218 return_record_name
1219 If true, when reading a union of records, the result will be a tuple
1220 where the first value is the name of the record and the second value is
1221 the record itself
1222 return_record_name_override
1223 If true, this will modify the behavior of return_record_name so that
1224 the record name is only returned for unions where there is more than
1225 one record. For unions that only have one record, this option will make
1226 it so that the record is returned by itself, not a tuple with the name.
1227 return_named_type
1228 If true, when reading a union of named types, the result will be a tuple
1229 where the first value is the name of the type and the second value is
1230 the record itself
1231 NOTE: Using this option will ignore return_record_name and
1232 return_record_name_override
1233 return_named_type_override
1234 If true, this will modify the behavior of return_named_type so that
1235 the named type is only returned for unions where there is more than
1236 one named type. For unions that only have one named type, this option
1237 will make it so that the named type is returned by itself, not a tuple
1238 with the name
1239 handle_unicode_errors
1240 Default `strict`. Should be set to a valid string that can be used in
1241 the errors argument of the string decode() function. Examples include
1242 `replace` and `ignore`
1243
1244
1245 Example::
1246
1247 parsed_schema = fastavro.parse_schema(schema)
1248 with open('file', 'rb') as fp:
1249 record = fastavro.schemaless_reader(fp, parsed_schema)
1250
1251 Note: The ``schemaless_reader`` can only read a single record.
1252 """
1253 if writer_schema == reader_schema:
1254 # No need for the reader schema if they are the same
1255 reader_schema = None
1256
1257 named_schemas: Dict[str, NamedSchemas] = _default_named_schemas()
1258 writer_schema = parse_schema(writer_schema, named_schemas["writer"])
1259
1260 if reader_schema:
1261 reader_schema = parse_schema(reader_schema, named_schemas["reader"])
1262
1263 decoder = BinaryDecoder(fo)
1264
1265 options = {
1266 "return_record_name": return_record_name,
1267 "return_record_name_override": return_record_name_override,
1268 "handle_unicode_errors": handle_unicode_errors,
1269 "return_named_type": return_named_type,
1270 "return_named_type_override": return_named_type_override,
1271 }
1272
1273 return read_data(
1274 decoder,
1275 writer_schema,
1276 named_schemas,
1277 reader_schema,
1278 options,
1279 )
1280
1281
1282def is_avro(path_or_buffer: Union[str, IO]) -> bool:
1283 """Return True if path (or buffer) points to an Avro file. This will only
1284 work for avro files that contain the normal avro schema header like those
1285 create from :func:`~fastavro._write_py.writer`. This function is not intended
1286 to be used with binary data created from
1287 :func:`~fastavro._write_py.schemaless_writer` since that does not include the
1288 avro header.
1289
1290 Parameters
1291 ----------
1292 path_or_buffer
1293 Path to file
1294 """
1295 fp: IO
1296 if isinstance(path_or_buffer, str):
1297 fp = open(path_or_buffer, "rb")
1298 close = True
1299 else:
1300 fp = path_or_buffer
1301 close = False
1302
1303 try:
1304 header = fp.read(len(MAGIC))
1305 return header == MAGIC
1306 finally:
1307 if close:
1308 fp.close()