1"""Python code for reading AVRO files"""
2
3# This code is a modified version of the code at
4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under
5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0)
6
7import bz2
8import json
9import lzma
10import zlib
11from datetime import datetime, timezone
12from decimal import Context
13from io import BytesIO
14from struct import error as StructError
15from typing import IO, Union, Optional, Generic, TypeVar, Iterator, Dict
16from warnings import warn
17
18from .io.binary_decoder import BinaryDecoder
19from .io.json_decoder import AvroJSONDecoder
20from .logical_readers import LOGICAL_READERS
21from .schema import (
22 extract_record_type,
23 is_single_record_union,
24 is_single_name_union,
25 extract_logical_type,
26 parse_schema,
27)
28from .types import Schema, AvroMessage, NamedSchemas
29from ._read_common import (
30 SchemaResolutionError,
31 MAGIC,
32 SYNC_SIZE,
33 HEADER_SCHEMA,
34 missing_codec_lib,
35)
36from .const import NAMED_TYPES, AVRO_TYPES
37
38T = TypeVar("T")
39
40decimal_context = Context()
41epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
42epoch_naive = datetime(1970, 1, 1)
43
44
45def _default_named_schemas() -> Dict[str, NamedSchemas]:
46 return {"writer": {}, "reader": {}}
47
48
49def match_types(writer_type, reader_type, named_schemas):
50 if isinstance(writer_type, list) or isinstance(reader_type, list):
51 return True
52 if isinstance(writer_type, dict) or isinstance(reader_type, dict):
53 try:
54 return match_schemas(writer_type, reader_type, named_schemas)
55 except SchemaResolutionError:
56 return False
57 if writer_type == reader_type:
58 return True
59 # promotion cases
60 elif writer_type == "int" and reader_type in ["long", "float", "double"]:
61 return True
62 elif writer_type == "long" and reader_type in ["float", "double"]:
63 return True
64 elif writer_type == "float" and reader_type == "double":
65 return True
66 elif writer_type == "string" and reader_type == "bytes":
67 return True
68 elif writer_type == "bytes" and reader_type == "string":
69 return True
70 writer_schema = named_schemas["writer"].get(writer_type)
71 reader_schema = named_schemas["reader"].get(reader_type)
72 if writer_schema is not None and reader_schema is not None:
73 return match_types(writer_schema, reader_schema, named_schemas)
74 return False
75
76
77def match_schemas(w_schema, r_schema, named_schemas):
78 error_msg = f"Schema mismatch: {w_schema} is not {r_schema}"
79 if isinstance(w_schema, list):
80 # If the writer is a union, checks will happen in read_union after the
81 # correct schema is known
82 return r_schema
83 elif isinstance(r_schema, list):
84 # If the reader is a union, ensure one of the new schemas is the same
85 # as the writer
86 for schema in r_schema:
87 if match_types(w_schema, schema, named_schemas):
88 return schema
89 else:
90 raise SchemaResolutionError(error_msg)
91 else:
92 # Check for dicts as primitive types are just strings
93 if isinstance(w_schema, dict):
94 w_type = w_schema["type"]
95 else:
96 w_type = w_schema
97 if isinstance(r_schema, dict):
98 r_type = r_schema["type"]
99 else:
100 r_type = r_schema
101
102 if w_type == r_type == "map":
103 if match_types(w_schema["values"], r_schema["values"], named_schemas):
104 return r_schema
105 elif w_type == r_type == "array":
106 if match_types(w_schema["items"], r_schema["items"], named_schemas):
107 return r_schema
108 elif w_type in NAMED_TYPES and r_type in NAMED_TYPES:
109 if w_type == r_type == "fixed" and w_schema["size"] != r_schema["size"]:
110 raise SchemaResolutionError(
111 f"Schema mismatch: {w_schema} size is different than {r_schema} size"
112 )
113
114 w_unqual_name = w_schema["name"].split(".")[-1]
115 r_unqual_name = r_schema["name"].split(".")[-1]
116 r_aliases = r_schema.get("aliases", [])
117 if (
118 w_unqual_name == r_unqual_name
119 or w_schema["name"] in r_aliases
120 or w_unqual_name in r_aliases
121 ):
122 return r_schema
123 elif w_type not in AVRO_TYPES and r_type in NAMED_TYPES:
124 if match_types(w_type, r_schema["name"], named_schemas):
125 return r_schema["name"]
126 elif match_types(w_type, r_type, named_schemas):
127 return r_schema
128 raise SchemaResolutionError(error_msg)
129
130
131def read_null(
132 decoder,
133 writer_schema=None,
134 named_schemas=None,
135 reader_schema=None,
136 options={},
137):
138 return decoder.read_null()
139
140
141def skip_null(decoder, writer_schema=None, named_schemas=None):
142 decoder.read_null()
143
144
145def read_boolean(
146 decoder,
147 writer_schema=None,
148 named_schemas=None,
149 reader_schema=None,
150 options={},
151):
152 return decoder.read_boolean()
153
154
155def skip_boolean(decoder, writer_schema=None, named_schemas=None):
156 decoder.read_boolean()
157
158
159def read_int(
160 decoder,
161 writer_schema=None,
162 named_schemas=None,
163 reader_schema=None,
164 options={},
165):
166 return decoder.read_int()
167
168
169def skip_int(decoder, writer_schema=None, named_schemas=None):
170 decoder.read_int()
171
172
173def read_long(
174 decoder,
175 writer_schema=None,
176 named_schemas=None,
177 reader_schema=None,
178 options={},
179):
180 return decoder.read_long()
181
182
183def skip_long(decoder, writer_schema=None, named_schemas=None):
184 decoder.read_long()
185
186
187def read_float(
188 decoder,
189 writer_schema=None,
190 named_schemas=None,
191 reader_schema=None,
192 options={},
193):
194 return decoder.read_float()
195
196
197def skip_float(decoder, writer_schema=None, named_schemas=None):
198 decoder.read_float()
199
200
201def read_double(
202 decoder,
203 writer_schema=None,
204 named_schemas=None,
205 reader_schema=None,
206 options={},
207):
208 return decoder.read_double()
209
210
211def skip_double(decoder, writer_schema=None, named_schemas=None):
212 decoder.read_double()
213
214
215def read_bytes(
216 decoder,
217 writer_schema=None,
218 named_schemas=None,
219 reader_schema=None,
220 options={},
221):
222 return decoder.read_bytes()
223
224
225def skip_bytes(decoder, writer_schema=None, named_schemas=None):
226 decoder.read_bytes()
227
228
229def read_utf8(
230 decoder,
231 writer_schema=None,
232 named_schemas=None,
233 reader_schema=None,
234 options={},
235):
236 return decoder.read_utf8(
237 handle_unicode_errors=options.get("handle_unicode_errors", "strict")
238 )
239
240
241def skip_utf8(decoder, writer_schema=None, named_schemas=None):
242 decoder.read_utf8()
243
244
245def read_fixed(
246 decoder,
247 writer_schema,
248 named_schemas=None,
249 reader_schema=None,
250 options={},
251):
252 size = writer_schema["size"]
253 return decoder.read_fixed(size)
254
255
256def skip_fixed(decoder, writer_schema, named_schemas=None):
257 size = writer_schema["size"]
258 decoder.read_fixed(size)
259
260
261def read_enum(
262 decoder,
263 writer_schema,
264 named_schemas,
265 reader_schema=None,
266 options={},
267):
268 symbol = writer_schema["symbols"][decoder.read_enum()]
269 if reader_schema and symbol not in reader_schema["symbols"]:
270 default = reader_schema.get("default")
271 if default:
272 return default
273 else:
274 symlist = reader_schema["symbols"]
275 msg = f"{symbol} not found in reader symbol list {reader_schema['name']}, known symbols: {symlist}"
276 raise SchemaResolutionError(msg)
277 return symbol
278
279
280def skip_enum(decoder, writer_schema, named_schemas):
281 decoder.read_enum()
282
283
284def read_array(
285 decoder,
286 writer_schema,
287 named_schemas,
288 reader_schema=None,
289 options={},
290):
291 if reader_schema:
292
293 def item_reader(decoder, w_schema, r_schema, options):
294 return read_data(
295 decoder,
296 w_schema["items"],
297 named_schemas,
298 r_schema["items"],
299 options,
300 )
301
302 else:
303
304 def item_reader(decoder, w_schema, r_schema, options):
305 return read_data(
306 decoder,
307 w_schema["items"],
308 named_schemas,
309 None,
310 options,
311 )
312
313 read_items = []
314
315 decoder.read_array_start()
316
317 for item in decoder.iter_array():
318 read_items.append(
319 item_reader(
320 decoder,
321 writer_schema,
322 reader_schema,
323 options,
324 )
325 )
326
327 decoder.read_array_end()
328
329 return read_items
330
331
332def skip_array(decoder, writer_schema, named_schemas):
333 decoder.read_array_start()
334
335 for item in decoder.iter_array():
336 skip_data(decoder, writer_schema["items"], named_schemas)
337
338 decoder.read_array_end()
339
340
341def read_map(
342 decoder,
343 writer_schema,
344 named_schemas,
345 reader_schema=None,
346 options={},
347):
348 if reader_schema:
349
350 def item_reader(decoder, w_schema, r_schema):
351 return read_data(
352 decoder,
353 w_schema["values"],
354 named_schemas,
355 r_schema["values"],
356 options,
357 )
358
359 else:
360
361 def item_reader(decoder, w_schema, r_schema):
362 return read_data(
363 decoder,
364 w_schema["values"],
365 named_schemas,
366 None,
367 options,
368 )
369
370 read_items = {}
371
372 decoder.read_map_start()
373
374 for item in decoder.iter_map():
375 key = decoder.read_utf8()
376 read_items[key] = item_reader(decoder, writer_schema, reader_schema)
377
378 decoder.read_map_end()
379
380 return read_items
381
382
383def skip_map(decoder, writer_schema, named_schemas):
384 decoder.read_map_start()
385
386 for item in decoder.iter_map():
387 decoder.read_utf8()
388 skip_data(decoder, writer_schema["values"], named_schemas)
389
390 decoder.read_map_end()
391
392
393def read_union(
394 decoder,
395 writer_schema,
396 named_schemas,
397 reader_schema=None,
398 options={},
399):
400 # schema resolution
401 index = decoder.read_index()
402 idx_schema = writer_schema[index]
403 idx_reader_schema = None
404
405 if reader_schema:
406 msg = f"schema mismatch: {writer_schema} not found in {reader_schema}"
407 # Handle case where the reader schema is just a single type (not union)
408 if not isinstance(reader_schema, list):
409 if match_types(idx_schema, reader_schema, named_schemas):
410 result = read_data(
411 decoder,
412 idx_schema,
413 named_schemas,
414 reader_schema,
415 options,
416 )
417 else:
418 raise SchemaResolutionError(msg)
419 else:
420 for schema in reader_schema:
421 if match_types(idx_schema, schema, named_schemas):
422 idx_reader_schema = schema
423 result = read_data(
424 decoder,
425 idx_schema,
426 named_schemas,
427 schema,
428 options,
429 )
430 break
431 else:
432 raise SchemaResolutionError(msg)
433 else:
434 result = read_data(decoder, idx_schema, named_schemas, None, options)
435
436 return_record_name_override = options.get("return_record_name_override")
437 return_record_name = options.get("return_record_name")
438 return_named_type_override = options.get("return_named_type_override")
439 return_named_type = options.get("return_named_type")
440 if return_named_type_override and is_single_name_union(writer_schema):
441 return result
442 elif return_named_type and extract_record_type(idx_schema) in NAMED_TYPES:
443 schema_name = (
444 idx_reader_schema["name"] if idx_reader_schema else idx_schema["name"]
445 )
446 return (schema_name, result)
447 elif return_named_type and extract_record_type(idx_schema) not in AVRO_TYPES:
448 # idx_schema is a named type
449 schema_name = (
450 named_schemas["reader"][idx_reader_schema]["name"]
451 if idx_reader_schema
452 else named_schemas["writer"][idx_schema]["name"]
453 )
454 return (schema_name, result)
455 elif return_record_name_override and is_single_record_union(writer_schema):
456 return result
457 elif return_record_name and extract_record_type(idx_schema) == "record":
458 schema_name = (
459 idx_reader_schema["name"] if idx_reader_schema else idx_schema["name"]
460 )
461 return (schema_name, result)
462 elif return_record_name and extract_record_type(idx_schema) not in AVRO_TYPES:
463 # idx_schema is a named type
464 schema_name = (
465 named_schemas["reader"][idx_reader_schema]["name"]
466 if idx_reader_schema
467 else named_schemas["writer"][idx_schema]["name"]
468 )
469 return (schema_name, result)
470 else:
471 return result
472
473
474def skip_union(decoder, writer_schema, named_schemas):
475 # schema resolution
476 index = decoder.read_index()
477 skip_data(decoder, writer_schema[index], named_schemas)
478
479
480def read_record(
481 decoder,
482 writer_schema,
483 named_schemas,
484 reader_schema=None,
485 options={},
486):
487 """A record is encoded by encoding the values of its fields in the order
488 that they are declared. In other words, a record is encoded as just the
489 concatenation of the encodings of its fields. Field values are encoded per
490 their schema.
491
492 Schema Resolution:
493 * the ordering of fields may be different: fields are matched by name.
494 * schemas for fields with the same name in both records are resolved
495 recursively.
496 * if the writer's record contains a field with a name not present in the
497 reader's record, the writer's value for that field is ignored.
498 * if the reader's record schema has a field that contains a default value,
499 and writer's schema does not have a field with the same name, then the
500 reader should use the default value from its field.
501 * if the reader's record schema has a field with no default value, and
502 writer's schema does not have a field with the same name, then the
503 field's value is unset.
504 """
505 record = {}
506 if reader_schema is None:
507 for field in writer_schema["fields"]:
508 record[field["name"]] = read_data(
509 decoder,
510 field["type"],
511 named_schemas,
512 None,
513 options,
514 )
515 else:
516 readers_field_dict = {}
517 aliases_field_dict = {}
518 for f in reader_schema["fields"]:
519 readers_field_dict[f["name"]] = f
520 for alias in f.get("aliases", []):
521 aliases_field_dict[alias] = f
522
523 for field in writer_schema["fields"]:
524 readers_field = readers_field_dict.get(
525 field["name"],
526 aliases_field_dict.get(field["name"]),
527 )
528 if readers_field:
529 record[readers_field["name"]] = read_data(
530 decoder,
531 field["type"],
532 named_schemas,
533 readers_field["type"],
534 options,
535 )
536 else:
537 skip_data(decoder, field["type"], named_schemas)
538
539 # fill in default values
540 if len(readers_field_dict) > len(record):
541 writer_fields = [f["name"] for f in writer_schema["fields"]]
542 for f_name, field in readers_field_dict.items():
543 if f_name not in writer_fields and f_name not in record:
544 if "default" in field:
545 record[field["name"]] = field["default"]
546 else:
547 msg = f"No default value for field {field['name']} in {reader_schema['name']}"
548 raise SchemaResolutionError(msg)
549
550 return record
551
552
553def skip_record(decoder, writer_schema, named_schemas):
554 for field in writer_schema["fields"]:
555 skip_data(decoder, field["type"], named_schemas)
556
557
558READERS = {
559 "null": read_null,
560 "boolean": read_boolean,
561 "string": read_utf8,
562 "int": read_int,
563 "long": read_long,
564 "float": read_float,
565 "double": read_double,
566 "bytes": read_bytes,
567 "fixed": read_fixed,
568 "enum": read_enum,
569 "array": read_array,
570 "map": read_map,
571 "union": read_union,
572 "error_union": read_union,
573 "record": read_record,
574 "error": read_record,
575 "request": read_record,
576}
577
578SKIPS = {
579 "null": skip_null,
580 "boolean": skip_boolean,
581 "string": skip_utf8,
582 "int": skip_int,
583 "long": skip_long,
584 "float": skip_float,
585 "double": skip_double,
586 "bytes": skip_bytes,
587 "fixed": skip_fixed,
588 "enum": skip_enum,
589 "array": skip_array,
590 "map": skip_map,
591 "union": skip_union,
592 "error_union": skip_union,
593 "record": skip_record,
594 "error": skip_record,
595 "request": skip_record,
596}
597
598
599def maybe_promote(data, writer_type, reader_type):
600 if writer_type == "int":
601 # No need to promote to long since they are the same type in Python
602 if reader_type == "float" or reader_type == "double":
603 return float(data)
604 if writer_type == "long":
605 if reader_type == "float" or reader_type == "double":
606 return float(data)
607 if writer_type == "string" and reader_type == "bytes":
608 return data.encode()
609 if writer_type == "bytes" and reader_type == "string":
610 return data.decode()
611 return data
612
613
614def read_data(
615 decoder,
616 writer_schema,
617 named_schemas,
618 reader_schema=None,
619 options={},
620):
621 """Read data from file object according to schema."""
622
623 record_type = extract_record_type(writer_schema)
624
625 if reader_schema:
626 reader_schema = match_schemas(
627 writer_schema,
628 reader_schema,
629 named_schemas,
630 )
631
632 reader_fn = READERS.get(record_type)
633 if reader_fn:
634 try:
635 data = reader_fn(
636 decoder,
637 writer_schema,
638 named_schemas,
639 reader_schema,
640 options,
641 )
642 except StructError:
643 raise EOFError(f"cannot read {record_type} from {decoder.fo}")
644
645 if "logicalType" in writer_schema:
646 logical_type = extract_logical_type(writer_schema)
647 fn = LOGICAL_READERS.get(logical_type)
648 if fn:
649 return fn(data, writer_schema, reader_schema)
650
651 if reader_schema is not None:
652 return maybe_promote(data, record_type, extract_record_type(reader_schema))
653 else:
654 return data
655 else:
656 return read_data(
657 decoder,
658 named_schemas["writer"][record_type],
659 named_schemas,
660 named_schemas["reader"].get(reader_schema),
661 options,
662 )
663
664
665def skip_data(decoder, writer_schema, named_schemas):
666 record_type = extract_record_type(writer_schema)
667
668 reader_fn = SKIPS.get(record_type)
669 if reader_fn:
670 reader_fn(decoder, writer_schema, named_schemas)
671 else:
672 skip_data(decoder, named_schemas["writer"][record_type], named_schemas)
673
674
675def skip_sync(fo, sync_marker):
676 """Skip an expected sync marker, complaining if it doesn't match"""
677 if fo.read(SYNC_SIZE) != sync_marker:
678 raise ValueError("expected sync marker not found")
679
680
681def null_read_block(decoder):
682 """Read block in "null" codec."""
683 return BytesIO(decoder.read_bytes())
684
685
686def deflate_read_block(decoder):
687 """Read block in "deflate" codec."""
688 data = decoder.read_bytes()
689 # -15 is the log of the window size; negative indicates "raw" (no
690 # zlib headers) decompression. See zlib.h.
691 return BytesIO(zlib.decompressobj(-15).decompress(data))
692
693
694def bzip2_read_block(decoder):
695 """Read block in "bzip2" codec."""
696 data = decoder.read_bytes()
697 return BytesIO(bz2.decompress(data))
698
699
700def xz_read_block(decoder):
701 length = read_long(decoder)
702 data = decoder.read_fixed(length)
703 return BytesIO(lzma.decompress(data))
704
705
706BLOCK_READERS = {
707 "null": null_read_block,
708 "deflate": deflate_read_block,
709 "bzip2": bzip2_read_block,
710 "xz": xz_read_block,
711}
712
713
714def snappy_read_block(decoder):
715 length = read_long(decoder)
716 data = decoder.read_fixed(length - 4)
717 decoder.read_fixed(4) # CRC
718 return BytesIO(snappy_decompress(data))
719
720
721try:
722 from cramjam import snappy
723
724 snappy_decompress = snappy.decompress_raw
725except ImportError:
726 try:
727 import snappy
728
729 snappy_decompress = snappy.decompress
730 warn(
731 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed",
732 DeprecationWarning,
733 )
734 except ImportError:
735 BLOCK_READERS["snappy"] = missing_codec_lib("snappy", "cramjam")
736 else:
737 BLOCK_READERS["snappy"] = snappy_read_block
738else:
739 BLOCK_READERS["snappy"] = snappy_read_block
740
741
742def zstandard_read_block(decoder):
743 length = read_long(decoder)
744 data = decoder.read_fixed(length)
745 return BytesIO(zstandard.ZstdDecompressor().decompressobj().decompress(data))
746
747
748try:
749 import zstandard
750except ImportError:
751 BLOCK_READERS["zstandard"] = missing_codec_lib("zstandard", "zstandard")
752else:
753 BLOCK_READERS["zstandard"] = zstandard_read_block
754
755
756def lz4_read_block(decoder):
757 length = read_long(decoder)
758 data = decoder.read_fixed(length)
759 return BytesIO(lz4.block.decompress(data))
760
761
762try:
763 import lz4.block
764except ImportError:
765 BLOCK_READERS["lz4"] = missing_codec_lib("lz4", "lz4")
766else:
767 BLOCK_READERS["lz4"] = lz4_read_block
768
769
770def _iter_avro_records(
771 decoder,
772 header,
773 codec,
774 writer_schema,
775 named_schemas,
776 reader_schema,
777 options,
778):
779 """Return iterator over avro records."""
780 sync_marker = header["sync"]
781
782 read_block = BLOCK_READERS.get(codec)
783 if not read_block:
784 raise ValueError(f"Unrecognized codec: {codec}")
785
786 block_count = 0
787 while True:
788 try:
789 block_count = decoder.read_long()
790 except EOFError:
791 return
792
793 block_fo = read_block(decoder)
794
795 for i in range(block_count):
796 yield read_data(
797 BinaryDecoder(block_fo),
798 writer_schema,
799 named_schemas,
800 reader_schema,
801 options,
802 )
803
804 skip_sync(decoder.fo, sync_marker)
805
806
807def _iter_avro_blocks(
808 decoder,
809 header,
810 codec,
811 writer_schema,
812 named_schemas,
813 reader_schema,
814 options,
815):
816 """Return iterator over avro blocks."""
817 sync_marker = header["sync"]
818
819 read_block = BLOCK_READERS.get(codec)
820 if not read_block:
821 raise ValueError(f"Unrecognized codec: {codec}")
822
823 while True:
824 offset = decoder.fo.tell()
825 try:
826 num_block_records = decoder.read_long()
827 except EOFError:
828 return
829
830 block_bytes = read_block(decoder)
831
832 skip_sync(decoder.fo, sync_marker)
833
834 size = decoder.fo.tell() - offset
835
836 yield Block(
837 block_bytes,
838 num_block_records,
839 codec,
840 reader_schema,
841 writer_schema,
842 named_schemas,
843 offset,
844 size,
845 options,
846 )
847
848
849class Block:
850 """An avro block. Will yield records when iterated over
851
852 .. attribute:: num_records
853
854 Number of records in the block
855
856 .. attribute:: writer_schema
857
858 The schema used when writing
859
860 .. attribute:: reader_schema
861
862 The schema used when reading (if provided)
863
864 .. attribute:: offset
865
866 Offset of the block from the beginning of the avro file
867
868 .. attribute:: size
869
870 Size of the block in bytes
871 """
872
873 def __init__(
874 self,
875 bytes_,
876 num_records,
877 codec,
878 reader_schema,
879 writer_schema,
880 named_schemas,
881 offset,
882 size,
883 options,
884 ):
885 self.bytes_ = bytes_
886 self.num_records = num_records
887 self.codec = codec
888 self.reader_schema = reader_schema
889 self.writer_schema = writer_schema
890 self._named_schemas = named_schemas
891 self.offset = offset
892 self.size = size
893 self.options = options
894
895 def __iter__(self):
896 for i in range(self.num_records):
897 yield read_data(
898 BinaryDecoder(self.bytes_),
899 self.writer_schema,
900 self._named_schemas,
901 self.reader_schema,
902 self.options,
903 )
904
905 def __str__(self):
906 return (
907 f"Avro block: {len(self.bytes_)} bytes, "
908 + f"{self.num_records} records, "
909 + f"codec: {self.codec}, position {self.offset}+{self.size}"
910 )
911
912
913class file_reader(Generic[T]):
914 def __init__(
915 self,
916 fo_or_decoder,
917 reader_schema=None,
918 options={},
919 ):
920 if isinstance(fo_or_decoder, AvroJSONDecoder):
921 self.decoder = fo_or_decoder
922 else:
923 # If a decoder was not provided, assume binary
924 self.decoder = BinaryDecoder(fo_or_decoder)
925
926 self._named_schemas = _default_named_schemas()
927 if reader_schema:
928 self.reader_schema = parse_schema(
929 reader_schema, self._named_schemas["reader"], _write_hint=False
930 )
931
932 else:
933 self.reader_schema = None
934 self.options = options
935 self._elems = None
936
937 def _read_header(self):
938 try:
939 self._header = read_data(
940 self.decoder,
941 HEADER_SCHEMA,
942 self._named_schemas,
943 None,
944 self.options,
945 )
946 except EOFError:
947 raise ValueError("cannot read header - is it an avro file?")
948
949 # `meta` values are bytes. So, the actual decoding has to be external.
950 self.metadata = {k: v.decode() for k, v in self._header["meta"].items()}
951
952 self._schema = json.loads(self.metadata["avro.schema"])
953 self.codec = self.metadata.get("avro.codec", "null")
954
955 # Older avro files created before we were more strict about
956 # defaults might have been writen with a bad default. Since we re-parse
957 # the writer schema here, it will now fail. Therefore, if a user
958 # provides a reader schema that passes parsing, we will ignore those
959 # default errors
960 if self.reader_schema is not None:
961 ignore_default_error = True
962 else:
963 ignore_default_error = False
964
965 # Always parse the writer schema since it might have named types that
966 # need to be stored in self._named_types
967 self.writer_schema = parse_schema(
968 self._schema,
969 self._named_schemas["writer"],
970 _write_hint=False,
971 _force=True,
972 _ignore_default_error=ignore_default_error,
973 )
974
975 @property
976 def schema(self):
977 import warnings
978
979 warnings.warn(
980 "The 'schema' attribute is deprecated. Please use 'writer_schema'",
981 DeprecationWarning,
982 )
983 return self._schema
984
985 def __iter__(self) -> Iterator[T]:
986 if not self._elems:
987 raise NotImplementedError
988 return self._elems
989
990 def __next__(self) -> T:
991 return next(self._elems)
992
993
994class reader(file_reader[AvroMessage]):
995 """Iterator over records in an avro file.
996
997 Parameters
998 ----------
999 fo
1000 File-like object to read from
1001 reader_schema
1002 Reader schema
1003 return_record_name
1004 If true, when reading a union of records, the result will be a tuple
1005 where the first value is the name of the record and the second value is
1006 the record itself
1007 return_record_name_override
1008 If true, this will modify the behavior of return_record_name so that
1009 the record name is only returned for unions where there is more than
1010 one record. For unions that only have one record, this option will make
1011 it so that the record is returned by itself, not a tuple with the name.
1012 return_named_type
1013 If true, when reading a union of named types, the result will be a tuple
1014 where the first value is the name of the type and the second value is
1015 the record itself
1016 NOTE: Using this option will ignore return_record_name and
1017 return_record_name_override
1018 return_named_type_override
1019 If true, this will modify the behavior of return_named_type so that
1020 the named type is only returned for unions where there is more than
1021 one named type. For unions that only have one named type, this option
1022 will make it so that the named type is returned by itself, not a tuple
1023 with the name
1024 handle_unicode_errors
1025 Default `strict`. Should be set to a valid string that can be used in
1026 the errors argument of the string decode() function. Examples include
1027 `replace` and `ignore`
1028
1029
1030 Example::
1031
1032 from fastavro import reader
1033 with open('some-file.avro', 'rb') as fo:
1034 avro_reader = reader(fo)
1035 for record in avro_reader:
1036 process_record(record)
1037
1038 The `fo` argument is a file-like object so another common example usage
1039 would use an `io.BytesIO` object like so::
1040
1041 from io import BytesIO
1042 from fastavro import writer, reader
1043
1044 fo = BytesIO()
1045 writer(fo, schema, records)
1046 fo.seek(0)
1047 for record in reader(fo):
1048 process_record(record)
1049
1050 .. attribute:: metadata
1051
1052 Key-value pairs in the header metadata
1053
1054 .. attribute:: codec
1055
1056 The codec used when writing
1057
1058 .. attribute:: writer_schema
1059
1060 The schema used when writing
1061
1062 .. attribute:: reader_schema
1063
1064 The schema used when reading (if provided)
1065 """
1066
1067 def __init__(
1068 self,
1069 fo: Union[IO, AvroJSONDecoder],
1070 reader_schema: Optional[Schema] = None,
1071 return_record_name: bool = False,
1072 return_record_name_override: bool = False,
1073 handle_unicode_errors: str = "strict",
1074 return_named_type: bool = False,
1075 return_named_type_override: bool = False,
1076 ):
1077 options = {
1078 "return_record_name": return_record_name,
1079 "return_record_name_override": return_record_name_override,
1080 "handle_unicode_errors": handle_unicode_errors,
1081 "return_named_type": return_named_type,
1082 "return_named_type_override": return_named_type_override,
1083 }
1084 super().__init__(fo, reader_schema, options)
1085
1086 if isinstance(self.decoder, AvroJSONDecoder):
1087 self.decoder.configure(self.reader_schema, self._named_schemas["reader"])
1088
1089 self.writer_schema = self.reader_schema
1090 self.reader_schema = None
1091 self._named_schemas["writer"] = self._named_schemas["reader"]
1092 self._named_schemas["reader"] = {}
1093
1094 def _elems():
1095 while not self.decoder.done:
1096 yield read_data(
1097 self.decoder,
1098 self.writer_schema,
1099 self._named_schemas,
1100 self.reader_schema,
1101 self.options,
1102 )
1103 self.decoder.drain()
1104
1105 self._elems = _elems()
1106
1107 else:
1108 self._read_header()
1109
1110 self._elems = _iter_avro_records(
1111 self.decoder,
1112 self._header,
1113 self.codec,
1114 self.writer_schema,
1115 self._named_schemas,
1116 self.reader_schema,
1117 self.options,
1118 )
1119
1120
1121class block_reader(file_reader[Block]):
1122 """Iterator over :class:`.Block` in an avro file.
1123
1124 Parameters
1125 ----------
1126 fo
1127 Input stream
1128 reader_schema
1129 Reader schema
1130 return_record_name
1131 If true, when reading a union of records, the result will be a tuple
1132 where the first value is the name of the record and the second value is
1133 the record itself
1134 return_record_name_override
1135 If true, this will modify the behavior of return_record_name so that
1136 the record name is only returned for unions where there is more than
1137 one record. For unions that only have one record, this option will make
1138 it so that the record is returned by itself, not a tuple with the name.
1139 return_named_type
1140 If true, when reading a union of named types, the result will be a tuple
1141 where the first value is the name of the type and the second value is
1142 the record itself
1143 NOTE: Using this option will ignore return_record_name and
1144 return_record_name_override
1145 return_named_type_override
1146 If true, this will modify the behavior of return_named_type so that
1147 the named type is only returned for unions where there is more than
1148 one named type. For unions that only have one named type, this option
1149 will make it so that the named type is returned by itself, not a tuple
1150 with the name
1151 handle_unicode_errors
1152 Default `strict`. Should be set to a valid string that can be used in
1153 the errors argument of the string decode() function. Examples include
1154 `replace` and `ignore`
1155
1156
1157 Example::
1158
1159 from fastavro import block_reader
1160 with open('some-file.avro', 'rb') as fo:
1161 avro_reader = block_reader(fo)
1162 for block in avro_reader:
1163 process_block(block)
1164
1165 .. attribute:: metadata
1166
1167 Key-value pairs in the header metadata
1168
1169 .. attribute:: codec
1170
1171 The codec used when writing
1172
1173 .. attribute:: writer_schema
1174
1175 The schema used when writing
1176
1177 .. attribute:: reader_schema
1178
1179 The schema used when reading (if provided)
1180 """
1181
1182 def __init__(
1183 self,
1184 fo: IO,
1185 reader_schema: Optional[Schema] = None,
1186 return_record_name: bool = False,
1187 return_record_name_override: bool = False,
1188 handle_unicode_errors: str = "strict",
1189 return_named_type: bool = False,
1190 return_named_type_override: bool = False,
1191 ):
1192 options = {
1193 "return_record_name": return_record_name,
1194 "return_record_name_override": return_record_name_override,
1195 "handle_unicode_errors": handle_unicode_errors,
1196 "return_named_type": return_named_type,
1197 "return_named_type_override": return_named_type_override,
1198 }
1199 super().__init__(fo, reader_schema, options)
1200
1201 self._read_header()
1202
1203 self._elems = _iter_avro_blocks(
1204 self.decoder,
1205 self._header,
1206 self.codec,
1207 self.writer_schema,
1208 self._named_schemas,
1209 self.reader_schema,
1210 self.options,
1211 )
1212
1213
1214def schemaless_reader(
1215 fo: IO,
1216 writer_schema: Schema,
1217 reader_schema: Optional[Schema] = None,
1218 return_record_name: bool = False,
1219 return_record_name_override: bool = False,
1220 handle_unicode_errors: str = "strict",
1221 return_named_type: bool = False,
1222 return_named_type_override: bool = False,
1223) -> AvroMessage:
1224 """Reads a single record written using the
1225 :meth:`~fastavro._write_py.schemaless_writer`
1226
1227 Parameters
1228 ----------
1229 fo
1230 Input stream
1231 writer_schema
1232 Schema used when calling schemaless_writer
1233 reader_schema
1234 If the schema has changed since being written then the new schema can
1235 be given to allow for schema migration
1236 return_record_name
1237 If true, when reading a union of records, the result will be a tuple
1238 where the first value is the name of the record and the second value is
1239 the record itself
1240 return_record_name_override
1241 If true, this will modify the behavior of return_record_name so that
1242 the record name is only returned for unions where there is more than
1243 one record. For unions that only have one record, this option will make
1244 it so that the record is returned by itself, not a tuple with the name.
1245 return_named_type
1246 If true, when reading a union of named types, the result will be a tuple
1247 where the first value is the name of the type and the second value is
1248 the record itself
1249 NOTE: Using this option will ignore return_record_name and
1250 return_record_name_override
1251 return_named_type_override
1252 If true, this will modify the behavior of return_named_type so that
1253 the named type is only returned for unions where there is more than
1254 one named type. For unions that only have one named type, this option
1255 will make it so that the named type is returned by itself, not a tuple
1256 with the name
1257 handle_unicode_errors
1258 Default `strict`. Should be set to a valid string that can be used in
1259 the errors argument of the string decode() function. Examples include
1260 `replace` and `ignore`
1261
1262
1263 Example::
1264
1265 parsed_schema = fastavro.parse_schema(schema)
1266 with open('file', 'rb') as fp:
1267 record = fastavro.schemaless_reader(fp, parsed_schema)
1268
1269 Note: The ``schemaless_reader`` can only read a single record.
1270 """
1271 if writer_schema == reader_schema:
1272 # No need for the reader schema if they are the same
1273 reader_schema = None
1274
1275 named_schemas: Dict[str, NamedSchemas] = _default_named_schemas()
1276 writer_schema = parse_schema(writer_schema, named_schemas["writer"])
1277
1278 if reader_schema:
1279 reader_schema = parse_schema(reader_schema, named_schemas["reader"])
1280
1281 decoder = BinaryDecoder(fo)
1282
1283 options = {
1284 "return_record_name": return_record_name,
1285 "return_record_name_override": return_record_name_override,
1286 "handle_unicode_errors": handle_unicode_errors,
1287 "return_named_type": return_named_type,
1288 "return_named_type_override": return_named_type_override,
1289 }
1290
1291 return read_data(
1292 decoder,
1293 writer_schema,
1294 named_schemas,
1295 reader_schema,
1296 options,
1297 )
1298
1299
1300def is_avro(path_or_buffer: Union[str, IO]) -> bool:
1301 """Return True if path (or buffer) points to an Avro file. This will only
1302 work for avro files that contain the normal avro schema header like those
1303 create from :func:`~fastavro._write_py.writer`. This function is not intended
1304 to be used with binary data created from
1305 :func:`~fastavro._write_py.schemaless_writer` since that does not include the
1306 avro header.
1307
1308 Parameters
1309 ----------
1310 path_or_buffer
1311 Path to file
1312 """
1313 fp: IO
1314 if isinstance(path_or_buffer, str):
1315 fp = open(path_or_buffer, "rb")
1316 close = True
1317 else:
1318 fp = path_or_buffer
1319 close = False
1320
1321 try:
1322 header = fp.read(len(MAGIC))
1323 return header == MAGIC
1324 finally:
1325 if close:
1326 fp.close()