1"""Python code for writing AVRO files"""
2
3# This code is a modified version of the code at
4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under
5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0)
6
7from abc import ABC, abstractmethod
8import json
9from io import BytesIO
10from os import urandom, SEEK_SET
11import bz2
12import lzma
13import zlib
14from typing import Union, IO, Iterable, Any, Optional, Dict
15from warnings import warn
16
17from .const import NAMED_TYPES
18from .io.binary_encoder import BinaryEncoder
19from .io.json_encoder import AvroJSONEncoder
20from .validation import _validate
21from .read import HEADER_SCHEMA, SYNC_SIZE, MAGIC, reader
22from .logical_writers import LOGICAL_WRITERS
23from .schema import extract_record_type, extract_logical_type, parse_schema
24from ._write_common import _is_appendable
25from .types import Schema, NamedSchemas
26
27
28def write_null(encoder, datum, schema, named_schemas, fname, options):
29 """null is written as zero bytes"""
30 encoder.write_null()
31
32
33def write_boolean(encoder, datum, schema, named_schemas, fname, options):
34 """A boolean is written as a single byte whose value is either 0 (false) or
35 1 (true)."""
36 encoder.write_boolean(datum)
37
38
39def write_int(encoder, datum, schema, named_schemas, fname, options):
40 """int and long values are written using variable-length, zig-zag coding."""
41 encoder.write_int(datum)
42
43
44def write_long(encoder, datum, schema, named_schemas, fname, options):
45 """int and long values are written using variable-length, zig-zag coding."""
46 encoder.write_long(datum)
47
48
49def write_float(encoder, datum, schema, named_schemas, fname, options):
50 """A float is written as 4 bytes. The float is converted into a 32-bit
51 integer using a method equivalent to Java's floatToIntBits and then encoded
52 in little-endian format."""
53 encoder.write_float(datum)
54
55
56def write_double(encoder, datum, schema, named_schemas, fname, options):
57 """A double is written as 8 bytes. The double is converted into a 64-bit
58 integer using a method equivalent to Java's doubleToLongBits and then
59 encoded in little-endian format."""
60 encoder.write_double(datum)
61
62
63def write_bytes(encoder, datum, schema, named_schemas, fname, options):
64 """Bytes are encoded as a long followed by that many bytes of data."""
65 encoder.write_bytes(datum)
66
67
68def write_utf8(encoder, datum, schema, named_schemas, fname, options):
69 """A string is encoded as a long followed by that many bytes of UTF-8
70 encoded character data."""
71 encoder.write_utf8(datum)
72
73
74def write_crc32(encoder, datum):
75 """A 4-byte, big-endian CRC32 checksum"""
76 encoder.write_crc32(datum)
77
78
79def write_fixed(encoder, datum, schema, named_schemas, fname, options):
80 """Fixed instances are encoded using the number of bytes declared in the
81 schema."""
82 if len(datum) != schema["size"]:
83 raise ValueError(
84 f"data of length {len(datum)} does not match schema size: {schema}"
85 )
86 encoder.write_fixed(datum)
87
88
89def write_enum(encoder, datum, schema, named_schemas, fname, options):
90 """An enum is encoded by a int, representing the zero-based position of
91 the symbol in the schema."""
92 index = schema["symbols"].index(datum)
93 encoder.write_enum(index)
94
95
96def write_array(encoder, datum, schema, named_schemas, fname, options):
97 """Arrays are encoded as a series of blocks.
98
99 Each block consists of a long count value, followed by that many array
100 items. A block with count zero indicates the end of the array. Each item
101 is encoded per the array's item schema.
102
103 If a block's count is negative, then the count is followed immediately by a
104 long block size, indicating the number of bytes in the block. The actual
105 count in this case is the absolute value of the count written."""
106 encoder.write_array_start()
107 if len(datum) > 0:
108 encoder.write_item_count(len(datum))
109 dtype = schema["items"]
110 for item in datum:
111 write_data(encoder, item, dtype, named_schemas, fname, options)
112 encoder.end_item()
113 encoder.write_array_end()
114
115
116def write_map(encoder, datum, schema, named_schemas, fname, options):
117 """Maps are encoded as a series of blocks.
118
119 Each block consists of a long count value, followed by that many key/value
120 pairs. A block with count zero indicates the end of the map. Each item is
121 encoded per the map's value schema.
122
123 If a block's count is negative, then the count is followed immediately by a
124 long block size, indicating the number of bytes in the block. The actual
125 count in this case is the absolute value of the count written."""
126 encoder.write_map_start()
127 if len(datum) > 0:
128 encoder.write_item_count(len(datum))
129 vtype = schema["values"]
130 for key, val in datum.items():
131 encoder.write_utf8(key)
132 write_data(encoder, val, vtype, named_schemas, fname, options)
133 encoder.write_map_end()
134
135
136def write_union(encoder, datum, schema, named_schemas, fname, options):
137 """A union is encoded by first writing a long value indicating the
138 zero-based position within the union of the schema of its value. The value
139 is then encoded per the indicated schema within the union."""
140
141 best_match_index = -1
142 if isinstance(datum, tuple) and not options.get("disable_tuple_notation"):
143 (name, datum) = datum
144 for index, candidate in enumerate(schema):
145 extracted_type = extract_record_type(candidate)
146 if extracted_type in NAMED_TYPES:
147 schema_name = candidate["name"]
148 else:
149 schema_name = extracted_type
150 if name == schema_name:
151 best_match_index = index
152 break
153
154 if best_match_index == -1:
155 field = f"on field {fname}" if fname else ""
156 msg = (
157 f"provided union type name {name} not found in schema "
158 + f"{schema} {field}"
159 )
160 raise ValueError(msg)
161 index = best_match_index
162 else:
163 pytype = type(datum)
164 most_fields = -1
165
166 # All of Python's floating point values are doubles, so to
167 # avoid loss of precision, we should always prefer 'double'
168 # if we are forced to choose between float and double.
169 #
170 # If 'double' comes before 'float' in the union, then we'll immediately
171 # choose it, and don't need to worry. But if 'float' comes before
172 # 'double', we don't want to pick it.
173 #
174 # So, if we ever see 'float', we skim through the rest of the options,
175 # just to see if 'double' is a possibility, because we'd prefer it.
176 could_be_float = False
177
178 for index, candidate in enumerate(schema):
179 if could_be_float:
180 if extract_record_type(candidate) == "double":
181 best_match_index = index
182 break
183 else:
184 # Nothing except "double" is even worth considering.
185 continue
186
187 if _validate(
188 datum,
189 candidate,
190 named_schemas,
191 raise_errors=False,
192 field="",
193 options=options,
194 ):
195 record_type = extract_record_type(candidate)
196 if record_type in named_schemas:
197 # Convert named record types into their full schema so that we can check most_fields
198 candidate = named_schemas[record_type]
199 record_type = extract_record_type(candidate)
200
201 if record_type == "record":
202 logical_type = extract_logical_type(candidate)
203 if logical_type:
204 prepare = LOGICAL_WRITERS.get(logical_type)
205 if prepare:
206 datum = prepare(datum, candidate)
207
208 candidate_fields = set(f["name"] for f in candidate["fields"])
209 datum_fields = set(datum)
210 fields = len(candidate_fields.intersection(datum_fields))
211 if fields > most_fields:
212 best_match_index = index
213 most_fields = fields
214 elif record_type == "float":
215 best_match_index = index
216 # Continue in the loop, because it's possible that there's
217 # another candidate which has record type 'double'
218 could_be_float = True
219 else:
220 best_match_index = index
221 break
222 if best_match_index == -1:
223 field = f"on field {fname}" if fname else ""
224 raise ValueError(
225 f"{repr(datum)} (type {pytype}) do not match {schema} {field}"
226 )
227 index = best_match_index
228
229 # write data
230 # TODO: There should be a way to give just the index
231 encoder.write_index(index, schema[index])
232 write_data(encoder, datum, schema[index], named_schemas, fname, options)
233
234
235def write_record(encoder, datum, schema, named_schemas, fname, options):
236 """A record is encoded by encoding the values of its fields in the order
237 that they are declared. In other words, a record is encoded as just the
238 concatenation of the encodings of its fields. Field values are encoded per
239 their schema."""
240 extras = set(datum) - set(field["name"] for field in schema["fields"])
241 if (options.get("strict") or options.get("strict_allow_default")) and extras:
242 raise ValueError(
243 f'record contains more fields than the schema specifies: {", ".join(extras)}'
244 )
245 for field in schema["fields"]:
246 name = field["name"]
247 field_type = field["type"]
248 if name not in datum:
249 if options.get("strict") or (
250 options.get("strict_allow_default") and "default" not in field
251 ):
252 raise ValueError(
253 f"Field {name} is specified in the schema but missing from the record"
254 )
255 elif "default" not in field and "null" not in field_type:
256 raise ValueError(f"no value and no default for {name}")
257 datum_value = datum.get(name, field.get("default"))
258 if field_type == "float" or field_type == "double":
259 # Handle float values like "NaN"
260 datum_value = float(datum_value)
261 write_data(
262 encoder,
263 datum_value,
264 field_type,
265 named_schemas,
266 name,
267 options,
268 )
269
270
271WRITERS = {
272 "null": write_null,
273 "boolean": write_boolean,
274 "string": write_utf8,
275 "int": write_int,
276 "long": write_long,
277 "float": write_float,
278 "double": write_double,
279 "bytes": write_bytes,
280 "fixed": write_fixed,
281 "enum": write_enum,
282 "array": write_array,
283 "map": write_map,
284 "union": write_union,
285 "error_union": write_union,
286 "record": write_record,
287 "error": write_record,
288}
289
290
291def write_data(encoder, datum, schema, named_schemas, fname, options):
292 """Write a datum of data to output stream.
293
294 Parameters
295 ----------
296 encoder: encoder
297 Type of encoder (e.g. binary or json)
298 datum: object
299 Data to write
300 schema: dict
301 Schema to use
302 named_schemas: dict
303 Mapping of fullname to schema definition
304 """
305
306 record_type = extract_record_type(schema)
307 logical_type = extract_logical_type(schema)
308
309 fn = WRITERS.get(record_type)
310 if fn:
311 if logical_type:
312 prepare = LOGICAL_WRITERS.get(logical_type)
313 if prepare:
314 datum = prepare(datum, schema)
315 try:
316 return fn(encoder, datum, schema, named_schemas, fname, options)
317 except TypeError as ex:
318 if fname:
319 raise TypeError(f"{ex} on field {fname}")
320 raise
321 else:
322 return write_data(
323 encoder, datum, named_schemas[record_type], named_schemas, "", options
324 )
325
326
327def write_header(encoder, metadata, sync_marker):
328 header = {
329 "magic": MAGIC,
330 "meta": {key: value.encode() for key, value in metadata.items()},
331 "sync": sync_marker,
332 }
333 write_data(encoder, header, HEADER_SCHEMA, {}, "", {})
334
335
336def null_write_block(encoder, block_bytes, compression_level):
337 """Write block in "null" codec."""
338 encoder.write_long(len(block_bytes))
339 encoder._fo.write(block_bytes)
340
341
342def deflate_write_block(encoder, block_bytes, compression_level):
343 """Write block in "deflate" codec."""
344 # The first two characters and last character are zlib
345 # wrappers around deflate data.
346 if compression_level is not None:
347 data = zlib.compress(block_bytes, compression_level)[2:-1]
348 else:
349 data = zlib.compress(block_bytes)[2:-1]
350 encoder.write_long(len(data))
351 encoder._fo.write(data)
352
353
354def bzip2_write_block(encoder, block_bytes, compression_level):
355 """Write block in "bzip2" codec."""
356 data = bz2.compress(block_bytes)
357 encoder.write_long(len(data))
358 encoder._fo.write(data)
359
360
361def xz_write_block(encoder, block_bytes, compression_level):
362 """Write block in "xz" codec."""
363 data = lzma.compress(block_bytes)
364 encoder.write_long(len(data))
365 encoder._fo.write(data)
366
367
368BLOCK_WRITERS = {
369 "null": null_write_block,
370 "deflate": deflate_write_block,
371 "bzip2": bzip2_write_block,
372 "xz": xz_write_block,
373}
374
375
376def _missing_codec_lib(codec, *libraries):
377 def missing(encoder, block_bytes, compression_level):
378 raise ValueError(
379 f"{codec} codec is supported but you need to install one of the "
380 + f"following libraries: {libraries}"
381 )
382
383 return missing
384
385
386def snappy_write_block(encoder, block_bytes, compression_level):
387 """Write block in "snappy" codec."""
388 data = snappy_compress(block_bytes)
389 encoder.write_long(len(data) + 4) # for CRC
390 encoder._fo.write(data)
391 encoder.write_crc32(block_bytes)
392
393
394try:
395 from cramjam import snappy
396
397 snappy_compress = snappy.compress_raw
398except ImportError:
399 try:
400 import snappy
401
402 snappy_compress = snappy.compress
403 warn(
404 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed",
405 DeprecationWarning,
406 )
407 except ImportError:
408 BLOCK_WRITERS["snappy"] = _missing_codec_lib("snappy", "cramjam")
409 else:
410 BLOCK_WRITERS["snappy"] = snappy_write_block
411else:
412 BLOCK_WRITERS["snappy"] = snappy_write_block
413
414
415def zstandard_write_block(encoder, block_bytes, compression_level):
416 """Write block in "zstandard" codec."""
417 if compression_level is not None:
418 data = zstandard.ZstdCompressor(level=compression_level).compress(block_bytes)
419 else:
420 data = zstandard.ZstdCompressor().compress(block_bytes)
421 encoder.write_long(len(data))
422 encoder._fo.write(data)
423
424
425try:
426 import zstandard
427except ImportError:
428 BLOCK_WRITERS["zstandard"] = _missing_codec_lib("zstandard", "zstandard")
429else:
430 BLOCK_WRITERS["zstandard"] = zstandard_write_block
431
432
433def lz4_write_block(encoder, block_bytes, compression_level):
434 """Write block in "lz4" codec."""
435 data = lz4.block.compress(block_bytes)
436 encoder.write_long(len(data))
437 encoder._fo.write(data)
438
439
440try:
441 import lz4.block
442except ImportError:
443 BLOCK_WRITERS["lz4"] = _missing_codec_lib("lz4", "lz4")
444else:
445 BLOCK_WRITERS["lz4"] = lz4_write_block
446
447
448class GenericWriter(ABC):
449 def __init__(self, schema, metadata=None, validator=None, options={}):
450 self._named_schemas = {}
451 self.validate_fn = _validate if validator else None
452 self.metadata = metadata or {}
453 self.options = options
454
455 # A schema of None is allowed when appending and when doing so the
456 # self.schema will be updated later
457 if schema is not None:
458 self.schema = parse_schema(schema, self._named_schemas)
459
460 if isinstance(schema, dict):
461 schema = {
462 key: value
463 for key, value in schema.items()
464 if key not in ("__fastavro_parsed", "__named_schemas")
465 }
466 elif isinstance(schema, list):
467 schemas = []
468 for s in schema:
469 if isinstance(s, dict):
470 schemas.append(
471 {
472 key: value
473 for key, value in s.items()
474 if key
475 not in (
476 "__fastavro_parsed",
477 "__named_schemas",
478 )
479 }
480 )
481 else:
482 schemas.append(s)
483 schema = schemas
484
485 self.metadata["avro.schema"] = json.dumps(schema)
486
487 @abstractmethod
488 def write(self, record):
489 pass
490
491 @abstractmethod
492 def flush(self):
493 pass
494
495
496class Writer(GenericWriter):
497 def __init__(
498 self,
499 fo: Union[IO, BinaryEncoder],
500 schema: Schema,
501 codec: str = "null",
502 sync_interval: int = 1000 * SYNC_SIZE,
503 metadata: Optional[Dict[str, str]] = None,
504 validator: bool = False,
505 sync_marker: bytes = b"",
506 compression_level: Optional[int] = None,
507 options: Dict[str, bool] = {},
508 ):
509 super().__init__(schema, metadata, validator, options)
510
511 self.metadata["avro.codec"] = codec
512 if isinstance(fo, BinaryEncoder):
513 self.encoder = fo
514 else:
515 self.encoder = BinaryEncoder(fo)
516 self.io = BinaryEncoder(BytesIO())
517 self.block_count = 0
518 self.sync_interval = sync_interval
519 self.compression_level = compression_level
520
521 if _is_appendable(self.encoder._fo):
522 # Seed to the beginning to read the header
523 self.encoder._fo.seek(0)
524 avro_reader = reader(self.encoder._fo)
525 header = avro_reader._header
526
527 self._named_schemas = {}
528 self.schema = parse_schema(avro_reader.writer_schema, self._named_schemas)
529
530 codec = avro_reader.metadata.get("avro.codec", "null")
531
532 self.sync_marker = header["sync"]
533
534 # Seek to the end of the file
535 self.encoder._fo.seek(0, 2)
536
537 self.block_writer = BLOCK_WRITERS[codec]
538 else:
539 self.sync_marker = sync_marker or urandom(SYNC_SIZE)
540
541 try:
542 self.block_writer = BLOCK_WRITERS[codec]
543 except KeyError:
544 raise ValueError(f"unrecognized codec: {codec}")
545
546 write_header(self.encoder, self.metadata, self.sync_marker)
547
548 def dump(self):
549 self.encoder.write_long(self.block_count)
550 self.block_writer(self.encoder, self.io._fo.getvalue(), self.compression_level)
551 self.encoder._fo.write(self.sync_marker)
552 self.io._fo.truncate(0)
553 self.io._fo.seek(0, SEEK_SET)
554 self.block_count = 0
555
556 def write(self, record):
557 if self.validate_fn:
558 self.validate_fn(
559 record, self.schema, self._named_schemas, "", True, self.options
560 )
561 write_data(self.io, record, self.schema, self._named_schemas, "", self.options)
562 self.block_count += 1
563 if self.io._fo.tell() >= self.sync_interval:
564 self.dump()
565
566 def write_block(self, block):
567 # Clear existing block if there are any records pending
568 if self.io._fo.tell() or self.block_count > 0:
569 self.dump()
570 self.encoder.write_long(block.num_records)
571 self.block_writer(self.encoder, block.bytes_.getvalue(), self.compression_level)
572 self.encoder._fo.write(self.sync_marker)
573
574 def flush(self):
575 if self.io._fo.tell() or self.block_count > 0:
576 self.dump()
577 self.encoder._fo.flush()
578
579
580class JSONWriter(GenericWriter):
581 def __init__(
582 self,
583 fo: AvroJSONEncoder,
584 schema: Schema,
585 codec: str = "null",
586 sync_interval: int = 1000 * SYNC_SIZE,
587 metadata: Optional[Dict[str, str]] = None,
588 validator: bool = False,
589 sync_marker: bytes = b"",
590 codec_compression_level: Optional[int] = None,
591 options: Dict[str, bool] = {},
592 ):
593 super().__init__(schema, metadata, validator, options)
594
595 self.encoder = fo
596 self.encoder.configure(self.schema, self._named_schemas)
597
598 def write(self, record):
599 if self.validate_fn:
600 self.validate_fn(
601 record, self.schema, self._named_schemas, "", True, self.options
602 )
603 write_data(
604 self.encoder, record, self.schema, self._named_schemas, "", self.options
605 )
606
607 def flush(self):
608 self.encoder.flush()
609
610
611def writer(
612 fo: Union[IO, AvroJSONEncoder],
613 schema: Schema,
614 records: Iterable[Any],
615 codec: str = "null",
616 sync_interval: int = 1000 * SYNC_SIZE,
617 metadata: Optional[Dict[str, str]] = None,
618 validator: bool = False,
619 sync_marker: bytes = b"",
620 codec_compression_level: Optional[int] = None,
621 *,
622 strict: bool = False,
623 strict_allow_default: bool = False,
624 disable_tuple_notation: bool = False,
625):
626 """Write records to fo (stream) according to schema
627
628 Parameters
629 ----------
630 fo
631 Output stream
632 schema
633 Writer schema
634 records
635 Records to write. This is commonly a list of the dictionary
636 representation of the records, but it can be any iterable
637 codec
638 Compression codec, can be 'null', 'deflate' or 'snappy' (if installed)
639 sync_interval
640 Size of sync interval
641 metadata
642 Header metadata
643 validator
644 If true, validation will be done on the records
645 sync_marker
646 A byte string used as the avro sync marker. If not provided, a random
647 byte string will be used.
648 codec_compression_level
649 Compression level to use with the specified codec (if the codec
650 supports it)
651 strict
652 If set to True, an error will be raised if records do not contain
653 exactly the same fields that the schema states
654 strict_allow_default
655 If set to True, an error will be raised if records do not contain
656 exactly the same fields that the schema states unless it is a missing
657 field that has a default value in the schema
658 disable_tuple_notation
659 If set to True, tuples will not be treated as a special case. Therefore,
660 using a tuple to indicate the type of a record will not work
661
662
663 Example::
664
665 from fastavro import writer, parse_schema
666
667 schema = {
668 'doc': 'A weather reading.',
669 'name': 'Weather',
670 'namespace': 'test',
671 'type': 'record',
672 'fields': [
673 {'name': 'station', 'type': 'string'},
674 {'name': 'time', 'type': 'long'},
675 {'name': 'temp', 'type': 'int'},
676 ],
677 }
678 parsed_schema = parse_schema(schema)
679
680 records = [
681 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
682 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
683 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
684 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
685 ]
686
687 with open('weather.avro', 'wb') as out:
688 writer(out, parsed_schema, records)
689
690 The `fo` argument is a file-like object so another common example usage
691 would use an `io.BytesIO` object like so::
692
693 from io import BytesIO
694 from fastavro import writer
695
696 fo = BytesIO()
697 writer(fo, schema, records)
698
699 Given an existing avro file, it's possible to append to it by re-opening
700 the file in `a+b` mode. If the file is only opened in `ab` mode, we aren't
701 able to read some of the existing header information and an error will be
702 raised. For example::
703
704 # Write initial records
705 with open('weather.avro', 'wb') as out:
706 writer(out, parsed_schema, records)
707
708 # Write some more records
709 with open('weather.avro', 'a+b') as out:
710 writer(out, None, more_records)
711
712 Note: When appending, any schema provided will be ignored since the schema
713 in the avro file will be re-used. Therefore it is convenient to just use
714 None as the schema.
715 """
716 # Sanity check that records is not a single dictionary (as that is a common
717 # mistake and the exception that gets raised is not helpful)
718 if isinstance(records, dict):
719 raise ValueError('"records" argument should be an iterable, not dict')
720
721 output: Union[JSONWriter, Writer]
722 if isinstance(fo, AvroJSONEncoder):
723 output = JSONWriter(
724 fo,
725 schema,
726 codec,
727 sync_interval,
728 metadata,
729 validator,
730 sync_marker,
731 codec_compression_level,
732 options={
733 "strict": strict,
734 "strict_allow_default": strict_allow_default,
735 "disable_tuple_notation": disable_tuple_notation,
736 },
737 )
738 else:
739 output = Writer(
740 BinaryEncoder(fo),
741 schema,
742 codec,
743 sync_interval,
744 metadata,
745 validator,
746 sync_marker,
747 codec_compression_level,
748 options={
749 "strict": strict,
750 "strict_allow_default": strict_allow_default,
751 "disable_tuple_notation": disable_tuple_notation,
752 },
753 )
754
755 for record in records:
756 output.write(record)
757 output.flush()
758
759
760def schemaless_writer(
761 fo: IO,
762 schema: Schema,
763 record: Any,
764 *,
765 strict: bool = False,
766 strict_allow_default: bool = False,
767 disable_tuple_notation: bool = False,
768):
769 """Write a single record without the schema or header information
770
771 Parameters
772 ----------
773 fo
774 Output file
775 schema
776 Schema
777 record
778 Record to write
779 strict
780 If set to True, an error will be raised if records do not contain
781 exactly the same fields that the schema states
782 strict_allow_default
783 If set to True, an error will be raised if records do not contain
784 exactly the same fields that the schema states unless it is a missing
785 field that has a default value in the schema
786 disable_tuple_notation
787 If set to True, tuples will not be treated as a special case. Therefore,
788 using a tuple to indicate the type of a record will not work
789
790
791 Example::
792
793 parsed_schema = fastavro.parse_schema(schema)
794 with open('file', 'wb') as fp:
795 fastavro.schemaless_writer(fp, parsed_schema, record)
796
797 Note: The ``schemaless_writer`` can only write a single record.
798 """
799 named_schemas: NamedSchemas = {}
800 schema = parse_schema(schema, named_schemas)
801
802 encoder = BinaryEncoder(fo)
803 write_data(
804 encoder,
805 record,
806 schema,
807 named_schemas,
808 "",
809 {
810 "strict": strict,
811 "strict_allow_default": strict_allow_default,
812 "disable_tuple_notation": disable_tuple_notation,
813 },
814 )
815 encoder.flush()