1"""Python code for reading AVRO files"""
2
3# This code is a modified version of the code at
4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under
5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0)
6
7import bz2
8import json
9import lzma
10import zlib
11from datetime import datetime, timezone
12from decimal import Context
13from io import BytesIO
14from struct import error as StructError
15from typing import IO, Union, Optional, Generic, TypeVar, Iterator, Dict
16from warnings import warn
17
18from .io.binary_decoder import BinaryDecoder
19from .io.json_decoder import AvroJSONDecoder
20from .logical_readers import LOGICAL_READERS
21from .schema import (
22 extract_record_type,
23 is_single_record_union,
24 is_single_name_union,
25 extract_logical_type,
26 parse_schema,
27)
28from .types import Schema, AvroMessage, NamedSchemas
29from ._read_common import (
30 SchemaResolutionError,
31 MAGIC,
32 SYNC_SIZE,
33 HEADER_SCHEMA,
34 missing_codec_lib,
35)
36from .const import NAMED_TYPES, AVRO_TYPES
37
38T = TypeVar("T")
39
40decimal_context = Context()
41epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
42epoch_naive = datetime(1970, 1, 1)
43
44
45def _default_named_schemas() -> Dict[str, NamedSchemas]:
46 return {"writer": {}, "reader": {}}
47
48
49def match_types(writer_type, reader_type, named_schemas):
50 if isinstance(writer_type, list) or isinstance(reader_type, list):
51 return True
52 if isinstance(writer_type, dict) or isinstance(reader_type, dict):
53 matching_schema = match_schemas(
54 writer_type, reader_type, named_schemas, raise_on_error=False
55 )
56 return matching_schema is not None
57 if writer_type == reader_type:
58 return True
59 # promotion cases
60 elif writer_type == "int" and reader_type in ["long", "float", "double"]:
61 return True
62 elif writer_type == "long" and reader_type in ["float", "double"]:
63 return True
64 elif writer_type == "float" and reader_type == "double":
65 return True
66 elif writer_type == "string" and reader_type == "bytes":
67 return True
68 elif writer_type == "bytes" and reader_type == "string":
69 return True
70 writer_schema = named_schemas["writer"].get(writer_type)
71 reader_schema = named_schemas["reader"].get(reader_type)
72 if writer_schema is not None and reader_schema is not None:
73 return match_types(writer_schema, reader_schema, named_schemas)
74 return False
75
76
77def match_schemas(w_schema, r_schema, named_schemas, raise_on_error=True):
78 if isinstance(w_schema, list):
79 # If the writer is a union, checks will happen in read_union after the
80 # correct schema is known
81 return r_schema
82 elif isinstance(r_schema, list):
83 # If the reader is a union, ensure one of the new schemas is the same
84 # as the writer
85 for schema in r_schema:
86 if match_types(w_schema, schema, named_schemas):
87 return schema
88 else:
89 if raise_on_error:
90 raise SchemaResolutionError(
91 f"Schema mismatch: {w_schema} is not {r_schema}"
92 )
93 else:
94 return None
95 else:
96 # Check for dicts as primitive types are just strings
97 if isinstance(w_schema, dict):
98 w_type = w_schema["type"]
99 else:
100 w_type = w_schema
101 if isinstance(r_schema, dict):
102 r_type = r_schema["type"]
103 else:
104 r_type = r_schema
105
106 if w_type == r_type == "map":
107 if match_types(w_schema["values"], r_schema["values"], named_schemas):
108 return r_schema
109 elif w_type == r_type == "array":
110 if match_types(w_schema["items"], r_schema["items"], named_schemas):
111 return r_schema
112 elif w_type in NAMED_TYPES and r_type in NAMED_TYPES:
113 if w_type == r_type == "fixed" and w_schema["size"] != r_schema["size"]:
114 if raise_on_error:
115 raise SchemaResolutionError(
116 f"Schema mismatch: {w_schema} size is different than {r_schema} size"
117 )
118 else:
119 return None
120
121 w_unqual_name = w_schema["name"].split(".")[-1]
122 r_unqual_name = r_schema["name"].split(".")[-1]
123 r_aliases = r_schema.get("aliases", [])
124 if (
125 w_unqual_name == r_unqual_name
126 or w_schema["name"] in r_aliases
127 or w_unqual_name in r_aliases
128 ):
129 return r_schema
130 elif w_type not in AVRO_TYPES and r_type in NAMED_TYPES:
131 if match_types(w_type, r_schema["name"], named_schemas):
132 return r_schema["name"]
133 elif match_types(w_type, r_type, named_schemas):
134 return r_schema
135 if raise_on_error:
136 raise SchemaResolutionError(
137 f"Schema mismatch: {w_schema} is not {r_schema}"
138 )
139 else:
140 return None
141
142
143def read_null(
144 decoder,
145 writer_schema=None,
146 named_schemas=None,
147 reader_schema=None,
148 options={},
149):
150 return decoder.read_null()
151
152
153def skip_null(decoder, writer_schema=None, named_schemas=None):
154 decoder.read_null()
155
156
157def read_boolean(
158 decoder,
159 writer_schema=None,
160 named_schemas=None,
161 reader_schema=None,
162 options={},
163):
164 return decoder.read_boolean()
165
166
167def skip_boolean(decoder, writer_schema=None, named_schemas=None):
168 decoder.read_boolean()
169
170
171def read_int(
172 decoder,
173 writer_schema=None,
174 named_schemas=None,
175 reader_schema=None,
176 options={},
177):
178 return decoder.read_int()
179
180
181def skip_int(decoder, writer_schema=None, named_schemas=None):
182 decoder.read_int()
183
184
185def read_long(
186 decoder,
187 writer_schema=None,
188 named_schemas=None,
189 reader_schema=None,
190 options={},
191):
192 return decoder.read_long()
193
194
195def skip_long(decoder, writer_schema=None, named_schemas=None):
196 decoder.read_long()
197
198
199def read_float(
200 decoder,
201 writer_schema=None,
202 named_schemas=None,
203 reader_schema=None,
204 options={},
205):
206 return decoder.read_float()
207
208
209def skip_float(decoder, writer_schema=None, named_schemas=None):
210 decoder.read_float()
211
212
213def read_double(
214 decoder,
215 writer_schema=None,
216 named_schemas=None,
217 reader_schema=None,
218 options={},
219):
220 return decoder.read_double()
221
222
223def skip_double(decoder, writer_schema=None, named_schemas=None):
224 decoder.read_double()
225
226
227def read_bytes(
228 decoder,
229 writer_schema=None,
230 named_schemas=None,
231 reader_schema=None,
232 options={},
233):
234 return decoder.read_bytes()
235
236
237def skip_bytes(decoder, writer_schema=None, named_schemas=None):
238 decoder.read_bytes()
239
240
241def read_utf8(
242 decoder,
243 writer_schema=None,
244 named_schemas=None,
245 reader_schema=None,
246 options={},
247):
248 return decoder.read_utf8(
249 handle_unicode_errors=options.get("handle_unicode_errors", "strict")
250 )
251
252
253def skip_utf8(decoder, writer_schema=None, named_schemas=None):
254 decoder.read_utf8()
255
256
257def read_fixed(
258 decoder,
259 writer_schema,
260 named_schemas=None,
261 reader_schema=None,
262 options={},
263):
264 size = writer_schema["size"]
265 return decoder.read_fixed(size)
266
267
268def skip_fixed(decoder, writer_schema, named_schemas=None):
269 size = writer_schema["size"]
270 decoder.read_fixed(size)
271
272
273def read_enum(
274 decoder,
275 writer_schema,
276 named_schemas,
277 reader_schema=None,
278 options={},
279):
280 symbol = writer_schema["symbols"][decoder.read_enum()]
281 if reader_schema and symbol not in reader_schema["symbols"]:
282 default = reader_schema.get("default")
283 if default:
284 return default
285 else:
286 symlist = reader_schema["symbols"]
287 msg = f"{symbol} not found in reader symbol list {reader_schema['name']}, known symbols: {symlist}"
288 raise SchemaResolutionError(msg)
289 return symbol
290
291
292def skip_enum(decoder, writer_schema, named_schemas):
293 decoder.read_enum()
294
295
296def read_array(
297 decoder,
298 writer_schema,
299 named_schemas,
300 reader_schema=None,
301 options={},
302):
303 if reader_schema:
304
305 def item_reader(decoder, w_schema, r_schema, options):
306 return read_data(
307 decoder,
308 w_schema["items"],
309 named_schemas,
310 r_schema["items"],
311 options,
312 )
313
314 else:
315
316 def item_reader(decoder, w_schema, r_schema, options):
317 return read_data(
318 decoder,
319 w_schema["items"],
320 named_schemas,
321 None,
322 options,
323 )
324
325 read_items = []
326
327 decoder.read_array_start()
328
329 for item in decoder.iter_array():
330 read_items.append(
331 item_reader(
332 decoder,
333 writer_schema,
334 reader_schema,
335 options,
336 )
337 )
338
339 decoder.read_array_end()
340
341 return read_items
342
343
344def skip_array(decoder, writer_schema, named_schemas):
345 decoder.read_array_start()
346
347 for item in decoder.iter_array():
348 skip_data(decoder, writer_schema["items"], named_schemas)
349
350 decoder.read_array_end()
351
352
353def read_map(
354 decoder,
355 writer_schema,
356 named_schemas,
357 reader_schema=None,
358 options={},
359):
360 if reader_schema:
361
362 def item_reader(decoder, w_schema, r_schema):
363 return read_data(
364 decoder,
365 w_schema["values"],
366 named_schemas,
367 r_schema["values"],
368 options,
369 )
370
371 else:
372
373 def item_reader(decoder, w_schema, r_schema):
374 return read_data(
375 decoder,
376 w_schema["values"],
377 named_schemas,
378 None,
379 options,
380 )
381
382 read_items = {}
383
384 decoder.read_map_start()
385
386 for item in decoder.iter_map():
387 key = decoder.read_utf8()
388 read_items[key] = item_reader(decoder, writer_schema, reader_schema)
389
390 decoder.read_map_end()
391
392 return read_items
393
394
395def skip_map(decoder, writer_schema, named_schemas):
396 decoder.read_map_start()
397
398 for item in decoder.iter_map():
399 decoder.read_utf8()
400 skip_data(decoder, writer_schema["values"], named_schemas)
401
402 decoder.read_map_end()
403
404
405def read_union(
406 decoder,
407 writer_schema,
408 named_schemas,
409 reader_schema=None,
410 options={},
411):
412 # schema resolution
413 index = decoder.read_index()
414 idx_schema = writer_schema[index]
415 idx_reader_schema = None
416
417 if reader_schema:
418 # Handle case where the reader schema is just a single type (not union)
419 if not isinstance(reader_schema, list):
420 if match_types(idx_schema, reader_schema, named_schemas):
421 result = read_data(
422 decoder,
423 idx_schema,
424 named_schemas,
425 reader_schema,
426 options,
427 )
428 else:
429 raise SchemaResolutionError(
430 f"schema mismatch: {writer_schema} not found in {reader_schema}"
431 )
432 else:
433 for schema in reader_schema:
434 if match_types(idx_schema, schema, named_schemas):
435 idx_reader_schema = schema
436 result = read_data(
437 decoder,
438 idx_schema,
439 named_schemas,
440 schema,
441 options,
442 )
443 break
444 else:
445 raise SchemaResolutionError(
446 f"schema mismatch: {writer_schema} not found in {reader_schema}"
447 )
448 else:
449 result = read_data(decoder, idx_schema, named_schemas, None, options)
450
451 return_record_name_override = options.get("return_record_name_override")
452 return_record_name = options.get("return_record_name")
453 return_named_type_override = options.get("return_named_type_override")
454 return_named_type = options.get("return_named_type")
455 if return_named_type_override and is_single_name_union(writer_schema):
456 return result
457 elif return_named_type and extract_record_type(idx_schema) in NAMED_TYPES:
458 schema_name = (
459 idx_reader_schema["name"] if idx_reader_schema else idx_schema["name"]
460 )
461 return (schema_name, result)
462 elif return_named_type and extract_record_type(idx_schema) not in AVRO_TYPES:
463 # idx_schema is a named type
464 schema_name = (
465 named_schemas["reader"][idx_reader_schema]["name"]
466 if idx_reader_schema
467 else named_schemas["writer"][idx_schema]["name"]
468 )
469 return (schema_name, result)
470 elif return_record_name_override and is_single_record_union(writer_schema):
471 return result
472 elif return_record_name and extract_record_type(idx_schema) == "record":
473 schema_name = (
474 idx_reader_schema["name"] if idx_reader_schema else idx_schema["name"]
475 )
476 return (schema_name, result)
477 elif return_record_name and extract_record_type(idx_schema) not in AVRO_TYPES:
478 # idx_schema is a named type
479 schema_name = (
480 named_schemas["reader"][idx_reader_schema]["name"]
481 if idx_reader_schema
482 else named_schemas["writer"][idx_schema]["name"]
483 )
484 return (schema_name, result)
485 else:
486 return result
487
488
489def skip_union(decoder, writer_schema, named_schemas):
490 # schema resolution
491 index = decoder.read_index()
492 skip_data(decoder, writer_schema[index], named_schemas)
493
494
495def read_record(
496 decoder,
497 writer_schema,
498 named_schemas,
499 reader_schema=None,
500 options={},
501):
502 """A record is encoded by encoding the values of its fields in the order
503 that they are declared. In other words, a record is encoded as just the
504 concatenation of the encodings of its fields. Field values are encoded per
505 their schema.
506
507 Schema Resolution:
508 * the ordering of fields may be different: fields are matched by name.
509 * schemas for fields with the same name in both records are resolved
510 recursively.
511 * if the writer's record contains a field with a name not present in the
512 reader's record, the writer's value for that field is ignored.
513 * if the reader's record schema has a field that contains a default value,
514 and writer's schema does not have a field with the same name, then the
515 reader should use the default value from its field.
516 * if the reader's record schema has a field with no default value, and
517 writer's schema does not have a field with the same name, then the
518 field's value is unset.
519 """
520 record = {}
521 if reader_schema is None:
522 for field in writer_schema["fields"]:
523 record[field["name"]] = read_data(
524 decoder,
525 field["type"],
526 named_schemas,
527 None,
528 options,
529 )
530 else:
531 readers_field_dict = {}
532 aliases_field_dict = {}
533 for f in reader_schema["fields"]:
534 readers_field_dict[f["name"]] = f
535 for alias in f.get("aliases", []):
536 aliases_field_dict[alias] = f
537
538 for field in writer_schema["fields"]:
539 readers_field = readers_field_dict.get(
540 field["name"],
541 aliases_field_dict.get(field["name"]),
542 )
543 if readers_field:
544 readers_field_name = readers_field["name"]
545 record[readers_field_name] = read_data(
546 decoder,
547 field["type"],
548 named_schemas,
549 readers_field["type"],
550 options,
551 )
552 del readers_field_dict[readers_field_name]
553 else:
554 skip_data(decoder, field["type"], named_schemas)
555
556 # fill in default values
557 for f_name, field in readers_field_dict.items():
558 if "default" in field:
559 record[field["name"]] = field["default"]
560 else:
561 msg = f"No default value for field {field['name']} in {reader_schema['name']}"
562 raise SchemaResolutionError(msg)
563
564 return record
565
566
567def skip_record(decoder, writer_schema, named_schemas):
568 for field in writer_schema["fields"]:
569 skip_data(decoder, field["type"], named_schemas)
570
571
572READERS = {
573 "null": read_null,
574 "boolean": read_boolean,
575 "string": read_utf8,
576 "int": read_int,
577 "long": read_long,
578 "float": read_float,
579 "double": read_double,
580 "bytes": read_bytes,
581 "fixed": read_fixed,
582 "enum": read_enum,
583 "array": read_array,
584 "map": read_map,
585 "union": read_union,
586 "error_union": read_union,
587 "record": read_record,
588 "error": read_record,
589 "request": read_record,
590}
591
592SKIPS = {
593 "null": skip_null,
594 "boolean": skip_boolean,
595 "string": skip_utf8,
596 "int": skip_int,
597 "long": skip_long,
598 "float": skip_float,
599 "double": skip_double,
600 "bytes": skip_bytes,
601 "fixed": skip_fixed,
602 "enum": skip_enum,
603 "array": skip_array,
604 "map": skip_map,
605 "union": skip_union,
606 "error_union": skip_union,
607 "record": skip_record,
608 "error": skip_record,
609 "request": skip_record,
610}
611
612
613def maybe_promote(data, writer_type, reader_type):
614 if writer_type == "int":
615 # No need to promote to long since they are the same type in Python
616 if reader_type == "float" or reader_type == "double":
617 return float(data)
618 if writer_type == "long":
619 if reader_type == "float" or reader_type == "double":
620 return float(data)
621 if writer_type == "string" and reader_type == "bytes":
622 return data.encode()
623 if writer_type == "bytes" and reader_type == "string":
624 return data.decode()
625 return data
626
627
628def read_data(
629 decoder,
630 writer_schema,
631 named_schemas,
632 reader_schema=None,
633 options={},
634):
635 """Read data from file object according to schema."""
636
637 record_type = extract_record_type(writer_schema)
638
639 if reader_schema:
640 reader_schema = match_schemas(
641 writer_schema,
642 reader_schema,
643 named_schemas,
644 )
645
646 reader_fn = READERS.get(record_type)
647 if reader_fn:
648 try:
649 data = reader_fn(
650 decoder,
651 writer_schema,
652 named_schemas,
653 reader_schema,
654 options,
655 )
656 except StructError:
657 raise EOFError(f"cannot read {record_type} from {decoder.fo}")
658
659 if "logicalType" in writer_schema:
660 logical_type = extract_logical_type(writer_schema)
661 fn = LOGICAL_READERS.get(logical_type)
662 if fn:
663 return fn(data, writer_schema, reader_schema)
664
665 if reader_schema is not None:
666 return maybe_promote(data, record_type, extract_record_type(reader_schema))
667 else:
668 return data
669 else:
670 return read_data(
671 decoder,
672 named_schemas["writer"][record_type],
673 named_schemas,
674 named_schemas["reader"].get(reader_schema),
675 options,
676 )
677
678
679def skip_data(decoder, writer_schema, named_schemas):
680 record_type = extract_record_type(writer_schema)
681
682 reader_fn = SKIPS.get(record_type)
683 if reader_fn:
684 reader_fn(decoder, writer_schema, named_schemas)
685 else:
686 skip_data(decoder, named_schemas["writer"][record_type], named_schemas)
687
688
689def skip_sync(fo, sync_marker):
690 """Skip an expected sync marker, complaining if it doesn't match"""
691 if fo.read(SYNC_SIZE) != sync_marker:
692 raise ValueError("expected sync marker not found")
693
694
695def null_read_block(decoder):
696 """Read block in "null" codec."""
697 return BytesIO(decoder.read_bytes())
698
699
700def deflate_read_block(decoder):
701 """Read block in "deflate" codec."""
702 data = decoder.read_bytes()
703 # -15 is the log of the window size; negative indicates "raw" (no
704 # zlib headers) decompression. See zlib.h.
705 return BytesIO(zlib.decompressobj(-15).decompress(data))
706
707
708def bzip2_read_block(decoder):
709 """Read block in "bzip2" codec."""
710 data = decoder.read_bytes()
711 return BytesIO(bz2.decompress(data))
712
713
714def xz_read_block(decoder):
715 length = read_long(decoder)
716 data = decoder.read_fixed(length)
717 return BytesIO(lzma.decompress(data))
718
719
720BLOCK_READERS = {
721 "null": null_read_block,
722 "deflate": deflate_read_block,
723 "bzip2": bzip2_read_block,
724 "xz": xz_read_block,
725}
726
727
728def snappy_read_block(decoder):
729 length = read_long(decoder)
730 data = decoder.read_fixed(length - 4)
731 decoder.read_fixed(4) # CRC
732 return BytesIO(snappy_decompress(data))
733
734
735try:
736 from cramjam import snappy
737
738 snappy_decompress = snappy.decompress_raw
739except ImportError:
740 try:
741 import snappy
742
743 snappy_decompress = snappy.decompress
744 warn(
745 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed",
746 DeprecationWarning,
747 )
748 except ImportError:
749 BLOCK_READERS["snappy"] = missing_codec_lib("snappy", "cramjam")
750 else:
751 BLOCK_READERS["snappy"] = snappy_read_block
752else:
753 BLOCK_READERS["snappy"] = snappy_read_block
754
755
756def zstandard_read_block(decoder):
757 length = read_long(decoder)
758 data = decoder.read_fixed(length)
759 return BytesIO(zstandard.ZstdDecompressor().decompressobj().decompress(data))
760
761
762try:
763 import zstandard
764except ImportError:
765 BLOCK_READERS["zstandard"] = missing_codec_lib("zstandard", "zstandard")
766else:
767 BLOCK_READERS["zstandard"] = zstandard_read_block
768
769
770def lz4_read_block(decoder):
771 length = read_long(decoder)
772 data = decoder.read_fixed(length)
773 return BytesIO(lz4.block.decompress(data))
774
775
776try:
777 import lz4.block
778except ImportError:
779 BLOCK_READERS["lz4"] = missing_codec_lib("lz4", "lz4")
780else:
781 BLOCK_READERS["lz4"] = lz4_read_block
782
783
784def _iter_avro_records(
785 decoder,
786 header,
787 codec,
788 writer_schema,
789 named_schemas,
790 reader_schema,
791 options,
792):
793 """Return iterator over avro records."""
794 sync_marker = header["sync"]
795
796 read_block = BLOCK_READERS.get(codec)
797 if not read_block:
798 raise ValueError(f"Unrecognized codec: {codec}")
799
800 block_count = 0
801 while True:
802 try:
803 block_count = decoder.read_long()
804 except EOFError:
805 return
806
807 block_fo = read_block(decoder)
808
809 for i in range(block_count):
810 yield read_data(
811 BinaryDecoder(block_fo),
812 writer_schema,
813 named_schemas,
814 reader_schema,
815 options,
816 )
817
818 skip_sync(decoder.fo, sync_marker)
819
820
821def _iter_avro_blocks(
822 decoder,
823 header,
824 codec,
825 writer_schema,
826 named_schemas,
827 reader_schema,
828 options,
829):
830 """Return iterator over avro blocks."""
831 sync_marker = header["sync"]
832
833 read_block = BLOCK_READERS.get(codec)
834 if not read_block:
835 raise ValueError(f"Unrecognized codec: {codec}")
836
837 while True:
838 offset = decoder.fo.tell()
839 try:
840 num_block_records = decoder.read_long()
841 except EOFError:
842 return
843
844 block_bytes = read_block(decoder)
845
846 skip_sync(decoder.fo, sync_marker)
847
848 size = decoder.fo.tell() - offset
849
850 yield Block(
851 block_bytes,
852 num_block_records,
853 codec,
854 reader_schema,
855 writer_schema,
856 named_schemas,
857 offset,
858 size,
859 options,
860 )
861
862
863class Block:
864 """An avro block. Will yield records when iterated over
865
866 .. attribute:: num_records
867
868 Number of records in the block
869
870 .. attribute:: writer_schema
871
872 The schema used when writing
873
874 .. attribute:: reader_schema
875
876 The schema used when reading (if provided)
877
878 .. attribute:: offset
879
880 Offset of the block from the beginning of the avro file
881
882 .. attribute:: size
883
884 Size of the block in bytes
885 """
886
887 def __init__(
888 self,
889 bytes_,
890 num_records,
891 codec,
892 reader_schema,
893 writer_schema,
894 named_schemas,
895 offset,
896 size,
897 options,
898 ):
899 self.bytes_ = bytes_
900 self.num_records = num_records
901 self.codec = codec
902 self.reader_schema = reader_schema
903 self.writer_schema = writer_schema
904 self._named_schemas = named_schemas
905 self.offset = offset
906 self.size = size
907 self.options = options
908
909 def __iter__(self):
910 for i in range(self.num_records):
911 yield read_data(
912 BinaryDecoder(self.bytes_),
913 self.writer_schema,
914 self._named_schemas,
915 self.reader_schema,
916 self.options,
917 )
918
919 def __str__(self):
920 return (
921 f"Avro block: {len(self.bytes_)} bytes, "
922 + f"{self.num_records} records, "
923 + f"codec: {self.codec}, position {self.offset}+{self.size}"
924 )
925
926
927class file_reader(Generic[T]):
928 def __init__(
929 self,
930 fo_or_decoder,
931 reader_schema=None,
932 options={},
933 ):
934 if isinstance(fo_or_decoder, AvroJSONDecoder):
935 self.decoder = fo_or_decoder
936 else:
937 # If a decoder was not provided, assume binary
938 self.decoder = BinaryDecoder(fo_or_decoder)
939
940 self._named_schemas = _default_named_schemas()
941 if reader_schema:
942 self.reader_schema = parse_schema(
943 reader_schema, self._named_schemas["reader"], _write_hint=False
944 )
945
946 else:
947 self.reader_schema = None
948 self.options = options
949 self._elems = None
950
951 def _read_header(self):
952 try:
953 self._header = read_data(
954 self.decoder,
955 HEADER_SCHEMA,
956 self._named_schemas,
957 None,
958 self.options,
959 )
960 except EOFError:
961 raise ValueError("cannot read header - is it an avro file?")
962
963 # `meta` values are bytes. So, the actual decoding has to be external.
964 self.metadata = {k: v.decode() for k, v in self._header["meta"].items()}
965
966 self._schema = json.loads(self.metadata["avro.schema"])
967 self.codec = self.metadata.get("avro.codec", "null")
968
969 # Older avro files created before we were more strict about
970 # defaults might have been writen with a bad default. Since we re-parse
971 # the writer schema here, it will now fail. Therefore, if a user
972 # provides a reader schema that passes parsing, we will ignore those
973 # default errors
974 if self.reader_schema is not None:
975 ignore_default_error = True
976 else:
977 ignore_default_error = False
978
979 # Always parse the writer schema since it might have named types that
980 # need to be stored in self._named_types
981 self.writer_schema = parse_schema(
982 self._schema,
983 self._named_schemas["writer"],
984 _write_hint=False,
985 _force=True,
986 _ignore_default_error=ignore_default_error,
987 )
988
989 @property
990 def schema(self):
991 import warnings
992
993 warnings.warn(
994 "The 'schema' attribute is deprecated. Please use 'writer_schema'",
995 DeprecationWarning,
996 )
997 return self._schema
998
999 def __iter__(self) -> Iterator[T]:
1000 if not self._elems:
1001 raise NotImplementedError
1002 return self._elems
1003
1004 def __next__(self) -> T:
1005 return next(self._elems)
1006
1007
1008class reader(file_reader[AvroMessage]):
1009 """Iterator over records in an avro file.
1010
1011 Parameters
1012 ----------
1013 fo
1014 File-like object to read from
1015 reader_schema
1016 Reader schema
1017 return_record_name
1018 If true, when reading a union of records, the result will be a tuple
1019 where the first value is the name of the record and the second value is
1020 the record itself
1021 return_record_name_override
1022 If true, this will modify the behavior of return_record_name so that
1023 the record name is only returned for unions where there is more than
1024 one record. For unions that only have one record, this option will make
1025 it so that the record is returned by itself, not a tuple with the name.
1026 return_named_type
1027 If true, when reading a union of named types, the result will be a tuple
1028 where the first value is the name of the type and the second value is
1029 the record itself
1030 NOTE: Using this option will ignore return_record_name and
1031 return_record_name_override
1032 return_named_type_override
1033 If true, this will modify the behavior of return_named_type so that
1034 the named type is only returned for unions where there is more than
1035 one named type. For unions that only have one named type, this option
1036 will make it so that the named type is returned by itself, not a tuple
1037 with the name
1038 handle_unicode_errors
1039 Default `strict`. Should be set to a valid string that can be used in
1040 the errors argument of the string decode() function. Examples include
1041 `replace` and `ignore`
1042
1043
1044 Example::
1045
1046 from fastavro import reader
1047 with open('some-file.avro', 'rb') as fo:
1048 avro_reader = reader(fo)
1049 for record in avro_reader:
1050 process_record(record)
1051
1052 The `fo` argument is a file-like object so another common example usage
1053 would use an `io.BytesIO` object like so::
1054
1055 from io import BytesIO
1056 from fastavro import writer, reader
1057
1058 fo = BytesIO()
1059 writer(fo, schema, records)
1060 fo.seek(0)
1061 for record in reader(fo):
1062 process_record(record)
1063
1064 .. attribute:: metadata
1065
1066 Key-value pairs in the header metadata
1067
1068 .. attribute:: codec
1069
1070 The codec used when writing
1071
1072 .. attribute:: writer_schema
1073
1074 The schema used when writing
1075
1076 .. attribute:: reader_schema
1077
1078 The schema used when reading (if provided)
1079 """
1080
1081 def __init__(
1082 self,
1083 fo: Union[IO, AvroJSONDecoder],
1084 reader_schema: Optional[Schema] = None,
1085 return_record_name: bool = False,
1086 return_record_name_override: bool = False,
1087 handle_unicode_errors: str = "strict",
1088 return_named_type: bool = False,
1089 return_named_type_override: bool = False,
1090 ):
1091 options = {
1092 "return_record_name": return_record_name,
1093 "return_record_name_override": return_record_name_override,
1094 "handle_unicode_errors": handle_unicode_errors,
1095 "return_named_type": return_named_type,
1096 "return_named_type_override": return_named_type_override,
1097 }
1098 super().__init__(fo, reader_schema, options)
1099
1100 if isinstance(self.decoder, AvroJSONDecoder):
1101 self.decoder.configure(self.reader_schema, self._named_schemas["reader"])
1102
1103 self.writer_schema = self.reader_schema
1104 self.reader_schema = None
1105 self._named_schemas["writer"] = self._named_schemas["reader"]
1106 self._named_schemas["reader"] = {}
1107
1108 def _elems():
1109 while not self.decoder.done:
1110 yield read_data(
1111 self.decoder,
1112 self.writer_schema,
1113 self._named_schemas,
1114 self.reader_schema,
1115 self.options,
1116 )
1117 self.decoder.drain()
1118
1119 self._elems = _elems()
1120
1121 else:
1122 self._read_header()
1123
1124 self._elems = _iter_avro_records(
1125 self.decoder,
1126 self._header,
1127 self.codec,
1128 self.writer_schema,
1129 self._named_schemas,
1130 self.reader_schema,
1131 self.options,
1132 )
1133
1134
1135class block_reader(file_reader[Block]):
1136 """Iterator over :class:`.Block` in an avro file.
1137
1138 Parameters
1139 ----------
1140 fo
1141 Input stream
1142 reader_schema
1143 Reader schema
1144 return_record_name
1145 If true, when reading a union of records, the result will be a tuple
1146 where the first value is the name of the record and the second value is
1147 the record itself
1148 return_record_name_override
1149 If true, this will modify the behavior of return_record_name so that
1150 the record name is only returned for unions where there is more than
1151 one record. For unions that only have one record, this option will make
1152 it so that the record is returned by itself, not a tuple with the name.
1153 return_named_type
1154 If true, when reading a union of named types, the result will be a tuple
1155 where the first value is the name of the type and the second value is
1156 the record itself
1157 NOTE: Using this option will ignore return_record_name and
1158 return_record_name_override
1159 return_named_type_override
1160 If true, this will modify the behavior of return_named_type so that
1161 the named type is only returned for unions where there is more than
1162 one named type. For unions that only have one named type, this option
1163 will make it so that the named type is returned by itself, not a tuple
1164 with the name
1165 handle_unicode_errors
1166 Default `strict`. Should be set to a valid string that can be used in
1167 the errors argument of the string decode() function. Examples include
1168 `replace` and `ignore`
1169
1170
1171 Example::
1172
1173 from fastavro import block_reader
1174 with open('some-file.avro', 'rb') as fo:
1175 avro_reader = block_reader(fo)
1176 for block in avro_reader:
1177 process_block(block)
1178
1179 .. attribute:: metadata
1180
1181 Key-value pairs in the header metadata
1182
1183 .. attribute:: codec
1184
1185 The codec used when writing
1186
1187 .. attribute:: writer_schema
1188
1189 The schema used when writing
1190
1191 .. attribute:: reader_schema
1192
1193 The schema used when reading (if provided)
1194 """
1195
1196 def __init__(
1197 self,
1198 fo: IO,
1199 reader_schema: Optional[Schema] = None,
1200 return_record_name: bool = False,
1201 return_record_name_override: bool = False,
1202 handle_unicode_errors: str = "strict",
1203 return_named_type: bool = False,
1204 return_named_type_override: bool = False,
1205 ):
1206 options = {
1207 "return_record_name": return_record_name,
1208 "return_record_name_override": return_record_name_override,
1209 "handle_unicode_errors": handle_unicode_errors,
1210 "return_named_type": return_named_type,
1211 "return_named_type_override": return_named_type_override,
1212 }
1213 super().__init__(fo, reader_schema, options)
1214
1215 self._read_header()
1216
1217 self._elems = _iter_avro_blocks(
1218 self.decoder,
1219 self._header,
1220 self.codec,
1221 self.writer_schema,
1222 self._named_schemas,
1223 self.reader_schema,
1224 self.options,
1225 )
1226
1227
1228def schemaless_reader(
1229 fo: IO,
1230 writer_schema: Schema,
1231 reader_schema: Optional[Schema] = None,
1232 return_record_name: bool = False,
1233 return_record_name_override: bool = False,
1234 handle_unicode_errors: str = "strict",
1235 return_named_type: bool = False,
1236 return_named_type_override: bool = False,
1237) -> AvroMessage:
1238 """Reads a single record written using the
1239 :meth:`~fastavro._write_py.schemaless_writer`
1240
1241 Parameters
1242 ----------
1243 fo
1244 Input stream
1245 writer_schema
1246 Schema used when calling schemaless_writer
1247 reader_schema
1248 If the schema has changed since being written then the new schema can
1249 be given to allow for schema migration
1250 return_record_name
1251 If true, when reading a union of records, the result will be a tuple
1252 where the first value is the name of the record and the second value is
1253 the record itself
1254 return_record_name_override
1255 If true, this will modify the behavior of return_record_name so that
1256 the record name is only returned for unions where there is more than
1257 one record. For unions that only have one record, this option will make
1258 it so that the record is returned by itself, not a tuple with the name.
1259 return_named_type
1260 If true, when reading a union of named types, the result will be a tuple
1261 where the first value is the name of the type and the second value is
1262 the record itself
1263 NOTE: Using this option will ignore return_record_name and
1264 return_record_name_override
1265 return_named_type_override
1266 If true, this will modify the behavior of return_named_type so that
1267 the named type is only returned for unions where there is more than
1268 one named type. For unions that only have one named type, this option
1269 will make it so that the named type is returned by itself, not a tuple
1270 with the name
1271 handle_unicode_errors
1272 Default `strict`. Should be set to a valid string that can be used in
1273 the errors argument of the string decode() function. Examples include
1274 `replace` and `ignore`
1275
1276
1277 Example::
1278
1279 parsed_schema = fastavro.parse_schema(schema)
1280 with open('file', 'rb') as fp:
1281 record = fastavro.schemaless_reader(fp, parsed_schema)
1282
1283 Note: The ``schemaless_reader`` can only read a single record.
1284 """
1285 if writer_schema == reader_schema:
1286 # No need for the reader schema if they are the same
1287 reader_schema = None
1288
1289 named_schemas: Dict[str, NamedSchemas] = _default_named_schemas()
1290 writer_schema = parse_schema(writer_schema, named_schemas["writer"])
1291
1292 if reader_schema:
1293 reader_schema = parse_schema(reader_schema, named_schemas["reader"])
1294
1295 decoder = BinaryDecoder(fo)
1296
1297 options = {
1298 "return_record_name": return_record_name,
1299 "return_record_name_override": return_record_name_override,
1300 "handle_unicode_errors": handle_unicode_errors,
1301 "return_named_type": return_named_type,
1302 "return_named_type_override": return_named_type_override,
1303 }
1304
1305 return read_data(
1306 decoder,
1307 writer_schema,
1308 named_schemas,
1309 reader_schema,
1310 options,
1311 )
1312
1313
1314def is_avro(path_or_buffer: Union[str, IO]) -> bool:
1315 """Return True if path (or buffer) points to an Avro file. This will only
1316 work for avro files that contain the normal avro schema header like those
1317 create from :func:`~fastavro._write_py.writer`. This function is not intended
1318 to be used with binary data created from
1319 :func:`~fastavro._write_py.schemaless_writer` since that does not include the
1320 avro header.
1321
1322 Parameters
1323 ----------
1324 path_or_buffer
1325 Path to file
1326 """
1327 fp: IO
1328 if isinstance(path_or_buffer, str):
1329 fp = open(path_or_buffer, "rb")
1330 close = True
1331 else:
1332 fp = path_or_buffer
1333 close = False
1334
1335 try:
1336 header = fp.read(len(MAGIC))
1337 return header == MAGIC
1338 finally:
1339 if close:
1340 fp.close()