1"""Python code for writing AVRO files"""
2
3# This code is a modified version of the code at
4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under
5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0)
6
7from abc import ABC, abstractmethod
8import json
9from io import BytesIO
10from os import urandom, SEEK_SET
11import bz2
12import lzma
13import zlib
14from typing import Union, IO, Iterable, Any, Optional, Dict
15from warnings import warn
16
17from .const import NAMED_TYPES
18from .io.binary_encoder import BinaryEncoder
19from .io.json_encoder import AvroJSONEncoder
20from .validation import _validate
21from .read import HEADER_SCHEMA, SYNC_SIZE, MAGIC, reader
22from .logical_writers import LOGICAL_WRITERS
23from .schema import extract_record_type, extract_logical_type, parse_schema
24from ._write_common import _is_appendable
25from .types import Schema, NamedSchemas
26
27
28def write_null(encoder, datum, schema, named_schemas, fname, options):
29 """null is written as zero bytes"""
30 encoder.write_null()
31
32
33def write_boolean(encoder, datum, schema, named_schemas, fname, options):
34 """A boolean is written as a single byte whose value is either 0 (false) or
35 1 (true)."""
36 encoder.write_boolean(datum)
37
38
39def write_int(encoder, datum, schema, named_schemas, fname, options):
40 """int and long values are written using variable-length, zig-zag coding."""
41 encoder.write_int(datum)
42
43
44def write_long(encoder, datum, schema, named_schemas, fname, options):
45 """int and long values are written using variable-length, zig-zag coding."""
46 encoder.write_long(datum)
47
48
49def write_float(encoder, datum, schema, named_schemas, fname, options):
50 """A float is written as 4 bytes. The float is converted into a 32-bit
51 integer using a method equivalent to Java's floatToIntBits and then encoded
52 in little-endian format."""
53 encoder.write_float(datum)
54
55
56def write_double(encoder, datum, schema, named_schemas, fname, options):
57 """A double is written as 8 bytes. The double is converted into a 64-bit
58 integer using a method equivalent to Java's doubleToLongBits and then
59 encoded in little-endian format."""
60 encoder.write_double(datum)
61
62
63def write_bytes(encoder, datum, schema, named_schemas, fname, options):
64 """Bytes are encoded as a long followed by that many bytes of data."""
65 encoder.write_bytes(datum)
66
67
68def write_utf8(encoder, datum, schema, named_schemas, fname, options):
69 """A string is encoded as a long followed by that many bytes of UTF-8
70 encoded character data."""
71 encoder.write_utf8(datum)
72
73
74def write_crc32(encoder, datum):
75 """A 4-byte, big-endian CRC32 checksum"""
76 encoder.write_crc32(datum)
77
78
79def write_fixed(encoder, datum, schema, named_schemas, fname, options):
80 """Fixed instances are encoded using the number of bytes declared in the
81 schema."""
82 if len(datum) != schema["size"]:
83 raise ValueError(
84 f"data of length {len(datum)} does not match schema size: {schema}"
85 )
86 encoder.write_fixed(datum)
87
88
89def write_enum(encoder, datum, schema, named_schemas, fname, options):
90 """An enum is encoded by a int, representing the zero-based position of
91 the symbol in the schema."""
92 index = schema["symbols"].index(datum)
93 encoder.write_enum(index)
94
95
96def write_array(encoder, datum, schema, named_schemas, fname, options):
97 """Arrays are encoded as a series of blocks.
98
99 Each block consists of a long count value, followed by that many array
100 items. A block with count zero indicates the end of the array. Each item
101 is encoded per the array's item schema.
102
103 If a block's count is negative, then the count is followed immediately by a
104 long block size, indicating the number of bytes in the block. The actual
105 count in this case is the absolute value of the count written."""
106 encoder.write_array_start()
107 if len(datum) > 0:
108 encoder.write_item_count(len(datum))
109 dtype = schema["items"]
110 for item in datum:
111 write_data(encoder, item, dtype, named_schemas, fname, options)
112 encoder.end_item()
113 encoder.write_array_end()
114
115
116def write_map(encoder, datum, schema, named_schemas, fname, options):
117 """Maps are encoded as a series of blocks.
118
119 Each block consists of a long count value, followed by that many key/value
120 pairs. A block with count zero indicates the end of the map. Each item is
121 encoded per the map's value schema.
122
123 If a block's count is negative, then the count is followed immediately by a
124 long block size, indicating the number of bytes in the block. The actual
125 count in this case is the absolute value of the count written."""
126 encoder.write_map_start()
127 if len(datum) > 0:
128 encoder.write_item_count(len(datum))
129 vtype = schema["values"]
130 for key, val in datum.items():
131 encoder.write_utf8(key)
132 write_data(encoder, val, vtype, named_schemas, fname, options)
133 encoder.write_map_end()
134
135
136def write_union(encoder, datum, schema, named_schemas, fname, options):
137 """A union is encoded by first writing a long value indicating the
138 zero-based position within the union of the schema of its value. The value
139 is then encoded per the indicated schema within the union."""
140
141 best_match_index = -1
142 if isinstance(datum, tuple) and not options.get("disable_tuple_notation"):
143 (name, datum) = datum
144 for index, candidate in enumerate(schema):
145 extracted_type = extract_record_type(candidate)
146 if extracted_type in NAMED_TYPES:
147 schema_name = candidate["name"]
148 else:
149 schema_name = extracted_type
150 if name == schema_name:
151 best_match_index = index
152 break
153
154 if best_match_index == -1:
155 field = f"on field {fname}" if fname else ""
156 msg = (
157 f"provided union type name {name} not found in schema "
158 + f"{schema} {field}"
159 )
160 raise ValueError(msg)
161 index = best_match_index
162 else:
163 pytype = type(datum)
164 most_fields = -1
165
166 # All of Python's floating point values are doubles, so to
167 # avoid loss of precision, we should always prefer 'double'
168 # if we are forced to choose between float and double.
169 #
170 # If 'double' comes before 'float' in the union, then we'll immediately
171 # choose it, and don't need to worry. But if 'float' comes before
172 # 'double', we don't want to pick it.
173 #
174 # So, if we ever see 'float', we skim through the rest of the options,
175 # just to see if 'double' is a possibility, because we'd prefer it.
176 could_be_float = False
177
178 for index, candidate in enumerate(schema):
179 if could_be_float:
180 if extract_record_type(candidate) == "double":
181 best_match_index = index
182 break
183 else:
184 # Nothing except "double" is even worth considering.
185 continue
186
187 if _validate(
188 datum,
189 candidate,
190 named_schemas,
191 raise_errors=False,
192 field="",
193 options=options,
194 ):
195 record_type = extract_record_type(candidate)
196 if record_type == "record":
197 logical_type = extract_logical_type(candidate)
198 if logical_type:
199 prepare = LOGICAL_WRITERS.get(logical_type)
200 if prepare:
201 datum = prepare(datum, candidate)
202
203 candidate_fields = set(f["name"] for f in candidate["fields"])
204 datum_fields = set(datum)
205 fields = len(candidate_fields.intersection(datum_fields))
206 if fields > most_fields:
207 best_match_index = index
208 most_fields = fields
209 elif record_type == "float":
210 best_match_index = index
211 # Continue in the loop, because it's possible that there's
212 # another candidate which has record type 'double'
213 could_be_float = True
214 else:
215 best_match_index = index
216 break
217 if best_match_index == -1:
218 field = f"on field {fname}" if fname else ""
219 raise ValueError(
220 f"{repr(datum)} (type {pytype}) do not match {schema} {field}"
221 )
222 index = best_match_index
223
224 # write data
225 # TODO: There should be a way to give just the index
226 encoder.write_index(index, schema[index])
227 write_data(encoder, datum, schema[index], named_schemas, fname, options)
228
229
230def write_record(encoder, datum, schema, named_schemas, fname, options):
231 """A record is encoded by encoding the values of its fields in the order
232 that they are declared. In other words, a record is encoded as just the
233 concatenation of the encodings of its fields. Field values are encoded per
234 their schema."""
235 extras = set(datum) - set(field["name"] for field in schema["fields"])
236 if (options.get("strict") or options.get("strict_allow_default")) and extras:
237 raise ValueError(
238 f'record contains more fields than the schema specifies: {", ".join(extras)}'
239 )
240 for field in schema["fields"]:
241 name = field["name"]
242 field_type = field["type"]
243 if name not in datum:
244 if options.get("strict") or (
245 options.get("strict_allow_default") and "default" not in field
246 ):
247 raise ValueError(
248 f"Field {name} is specified in the schema but missing from the record"
249 )
250 elif "default" not in field and "null" not in field_type:
251 raise ValueError(f"no value and no default for {name}")
252 datum_value = datum.get(name, field.get("default"))
253 if field_type == "float" or field_type == "double":
254 # Handle float values like "NaN"
255 datum_value = float(datum_value)
256 write_data(
257 encoder,
258 datum_value,
259 field_type,
260 named_schemas,
261 name,
262 options,
263 )
264
265
266WRITERS = {
267 "null": write_null,
268 "boolean": write_boolean,
269 "string": write_utf8,
270 "int": write_int,
271 "long": write_long,
272 "float": write_float,
273 "double": write_double,
274 "bytes": write_bytes,
275 "fixed": write_fixed,
276 "enum": write_enum,
277 "array": write_array,
278 "map": write_map,
279 "union": write_union,
280 "error_union": write_union,
281 "record": write_record,
282 "error": write_record,
283}
284
285
286def write_data(encoder, datum, schema, named_schemas, fname, options):
287 """Write a datum of data to output stream.
288
289 Parameters
290 ----------
291 encoder: encoder
292 Type of encoder (e.g. binary or json)
293 datum: object
294 Data to write
295 schema: dict
296 Schema to use
297 named_schemas: dict
298 Mapping of fullname to schema definition
299 """
300
301 record_type = extract_record_type(schema)
302 logical_type = extract_logical_type(schema)
303
304 fn = WRITERS.get(record_type)
305 if fn:
306 if logical_type:
307 prepare = LOGICAL_WRITERS.get(logical_type)
308 if prepare:
309 datum = prepare(datum, schema)
310 try:
311 return fn(encoder, datum, schema, named_schemas, fname, options)
312 except TypeError as ex:
313 if fname:
314 raise TypeError(f"{ex} on field {fname}")
315 raise
316 else:
317 return write_data(
318 encoder, datum, named_schemas[record_type], named_schemas, "", options
319 )
320
321
322def write_header(encoder, metadata, sync_marker):
323 header = {
324 "magic": MAGIC,
325 "meta": {key: value.encode() for key, value in metadata.items()},
326 "sync": sync_marker,
327 }
328 write_data(encoder, header, HEADER_SCHEMA, {}, "", {})
329
330
331def null_write_block(encoder, block_bytes, compression_level):
332 """Write block in "null" codec."""
333 encoder.write_long(len(block_bytes))
334 encoder._fo.write(block_bytes)
335
336
337def deflate_write_block(encoder, block_bytes, compression_level):
338 """Write block in "deflate" codec."""
339 # The first two characters and last character are zlib
340 # wrappers around deflate data.
341 if compression_level is not None:
342 data = zlib.compress(block_bytes, compression_level)[2:-1]
343 else:
344 data = zlib.compress(block_bytes)[2:-1]
345 encoder.write_long(len(data))
346 encoder._fo.write(data)
347
348
349def bzip2_write_block(encoder, block_bytes, compression_level):
350 """Write block in "bzip2" codec."""
351 data = bz2.compress(block_bytes)
352 encoder.write_long(len(data))
353 encoder._fo.write(data)
354
355
356def xz_write_block(encoder, block_bytes, compression_level):
357 """Write block in "xz" codec."""
358 data = lzma.compress(block_bytes)
359 encoder.write_long(len(data))
360 encoder._fo.write(data)
361
362
363BLOCK_WRITERS = {
364 "null": null_write_block,
365 "deflate": deflate_write_block,
366 "bzip2": bzip2_write_block,
367 "xz": xz_write_block,
368}
369
370
371def _missing_codec_lib(codec, *libraries):
372 def missing(encoder, block_bytes, compression_level):
373 raise ValueError(
374 f"{codec} codec is supported but you need to install one of the "
375 + f"following libraries: {libraries}"
376 )
377
378 return missing
379
380
381def snappy_write_block(encoder, block_bytes, compression_level):
382 """Write block in "snappy" codec."""
383 data = snappy_compress(block_bytes)
384 encoder.write_long(len(data) + 4) # for CRC
385 encoder._fo.write(data)
386 encoder.write_crc32(block_bytes)
387
388
389try:
390 from cramjam import snappy
391
392 snappy_compress = snappy.compress_raw
393except ImportError:
394 try:
395 import snappy
396
397 snappy_compress = snappy.compress
398 warn(
399 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed",
400 DeprecationWarning,
401 )
402 except ImportError:
403 BLOCK_WRITERS["snappy"] = _missing_codec_lib("snappy", "cramjam")
404 else:
405 BLOCK_WRITERS["snappy"] = snappy_write_block
406else:
407 BLOCK_WRITERS["snappy"] = snappy_write_block
408
409
410def zstandard_write_block(encoder, block_bytes, compression_level):
411 """Write block in "zstandard" codec."""
412 if compression_level is not None:
413 data = zstandard.ZstdCompressor(level=compression_level).compress(block_bytes)
414 else:
415 data = zstandard.ZstdCompressor().compress(block_bytes)
416 encoder.write_long(len(data))
417 encoder._fo.write(data)
418
419
420try:
421 import zstandard
422except ImportError:
423 BLOCK_WRITERS["zstandard"] = _missing_codec_lib("zstandard", "zstandard")
424else:
425 BLOCK_WRITERS["zstandard"] = zstandard_write_block
426
427
428def lz4_write_block(encoder, block_bytes, compression_level):
429 """Write block in "lz4" codec."""
430 data = lz4.block.compress(block_bytes)
431 encoder.write_long(len(data))
432 encoder._fo.write(data)
433
434
435try:
436 import lz4.block
437except ImportError:
438 BLOCK_WRITERS["lz4"] = _missing_codec_lib("lz4", "lz4")
439else:
440 BLOCK_WRITERS["lz4"] = lz4_write_block
441
442
443class GenericWriter(ABC):
444 def __init__(self, schema, metadata=None, validator=None, options={}):
445 self._named_schemas = {}
446 self.validate_fn = _validate if validator else None
447 self.metadata = metadata or {}
448 self.options = options
449
450 # A schema of None is allowed when appending and when doing so the
451 # self.schema will be updated later
452 if schema is not None:
453 self.schema = parse_schema(schema, self._named_schemas)
454
455 if isinstance(schema, dict):
456 schema = {
457 key: value
458 for key, value in schema.items()
459 if key not in ("__fastavro_parsed", "__named_schemas")
460 }
461 elif isinstance(schema, list):
462 schemas = []
463 for s in schema:
464 if isinstance(s, dict):
465 schemas.append(
466 {
467 key: value
468 for key, value in s.items()
469 if key
470 not in (
471 "__fastavro_parsed",
472 "__named_schemas",
473 )
474 }
475 )
476 else:
477 schemas.append(s)
478 schema = schemas
479
480 self.metadata["avro.schema"] = json.dumps(schema)
481
482 @abstractmethod
483 def write(self, record):
484 pass
485
486 @abstractmethod
487 def flush(self):
488 pass
489
490
491class Writer(GenericWriter):
492 def __init__(
493 self,
494 fo: Union[IO, BinaryEncoder],
495 schema: Schema,
496 codec: str = "null",
497 sync_interval: int = 1000 * SYNC_SIZE,
498 metadata: Optional[Dict[str, str]] = None,
499 validator: bool = False,
500 sync_marker: bytes = b"",
501 compression_level: Optional[int] = None,
502 options: Dict[str, bool] = {},
503 ):
504 super().__init__(schema, metadata, validator, options)
505
506 self.metadata["avro.codec"] = codec
507 if isinstance(fo, BinaryEncoder):
508 self.encoder = fo
509 else:
510 self.encoder = BinaryEncoder(fo)
511 self.io = BinaryEncoder(BytesIO())
512 self.block_count = 0
513 self.sync_interval = sync_interval
514 self.compression_level = compression_level
515
516 if _is_appendable(self.encoder._fo):
517 # Seed to the beginning to read the header
518 self.encoder._fo.seek(0)
519 avro_reader = reader(self.encoder._fo)
520 header = avro_reader._header
521
522 self._named_schemas = {}
523 self.schema = parse_schema(avro_reader.writer_schema, self._named_schemas)
524
525 codec = avro_reader.metadata.get("avro.codec", "null")
526
527 self.sync_marker = header["sync"]
528
529 # Seek to the end of the file
530 self.encoder._fo.seek(0, 2)
531
532 self.block_writer = BLOCK_WRITERS[codec]
533 else:
534 self.sync_marker = sync_marker or urandom(SYNC_SIZE)
535
536 try:
537 self.block_writer = BLOCK_WRITERS[codec]
538 except KeyError:
539 raise ValueError(f"unrecognized codec: {codec}")
540
541 write_header(self.encoder, self.metadata, self.sync_marker)
542
543 def dump(self):
544 self.encoder.write_long(self.block_count)
545 self.block_writer(self.encoder, self.io._fo.getvalue(), self.compression_level)
546 self.encoder._fo.write(self.sync_marker)
547 self.io._fo.truncate(0)
548 self.io._fo.seek(0, SEEK_SET)
549 self.block_count = 0
550
551 def write(self, record):
552 if self.validate_fn:
553 self.validate_fn(
554 record, self.schema, self._named_schemas, "", True, self.options
555 )
556 write_data(self.io, record, self.schema, self._named_schemas, "", self.options)
557 self.block_count += 1
558 if self.io._fo.tell() >= self.sync_interval:
559 self.dump()
560
561 def write_block(self, block):
562 # Clear existing block if there are any records pending
563 if self.io._fo.tell() or self.block_count > 0:
564 self.dump()
565 self.encoder.write_long(block.num_records)
566 self.block_writer(self.encoder, block.bytes_.getvalue(), self.compression_level)
567 self.encoder._fo.write(self.sync_marker)
568
569 def flush(self):
570 if self.io._fo.tell() or self.block_count > 0:
571 self.dump()
572 self.encoder._fo.flush()
573
574
575class JSONWriter(GenericWriter):
576 def __init__(
577 self,
578 fo: AvroJSONEncoder,
579 schema: Schema,
580 codec: str = "null",
581 sync_interval: int = 1000 * SYNC_SIZE,
582 metadata: Optional[Dict[str, str]] = None,
583 validator: bool = False,
584 sync_marker: bytes = b"",
585 codec_compression_level: Optional[int] = None,
586 options: Dict[str, bool] = {},
587 ):
588 super().__init__(schema, metadata, validator, options)
589
590 self.encoder = fo
591 self.encoder.configure(self.schema, self._named_schemas)
592
593 def write(self, record):
594 if self.validate_fn:
595 self.validate_fn(
596 record, self.schema, self._named_schemas, "", True, self.options
597 )
598 write_data(
599 self.encoder, record, self.schema, self._named_schemas, "", self.options
600 )
601
602 def flush(self):
603 self.encoder.flush()
604
605
606def writer(
607 fo: Union[IO, AvroJSONEncoder],
608 schema: Schema,
609 records: Iterable[Any],
610 codec: str = "null",
611 sync_interval: int = 1000 * SYNC_SIZE,
612 metadata: Optional[Dict[str, str]] = None,
613 validator: bool = False,
614 sync_marker: bytes = b"",
615 codec_compression_level: Optional[int] = None,
616 *,
617 strict: bool = False,
618 strict_allow_default: bool = False,
619 disable_tuple_notation: bool = False,
620):
621 """Write records to fo (stream) according to schema
622
623 Parameters
624 ----------
625 fo
626 Output stream
627 schema
628 Writer schema
629 records
630 Records to write. This is commonly a list of the dictionary
631 representation of the records, but it can be any iterable
632 codec
633 Compression codec, can be 'null', 'deflate' or 'snappy' (if installed)
634 sync_interval
635 Size of sync interval
636 metadata
637 Header metadata
638 validator
639 If true, validation will be done on the records
640 sync_marker
641 A byte string used as the avro sync marker. If not provided, a random
642 byte string will be used.
643 codec_compression_level
644 Compression level to use with the specified codec (if the codec
645 supports it)
646 strict
647 If set to True, an error will be raised if records do not contain
648 exactly the same fields that the schema states
649 strict_allow_default
650 If set to True, an error will be raised if records do not contain
651 exactly the same fields that the schema states unless it is a missing
652 field that has a default value in the schema
653 disable_tuple_notation
654 If set to True, tuples will not be treated as a special case. Therefore,
655 using a tuple to indicate the type of a record will not work
656
657
658 Example::
659
660 from fastavro import writer, parse_schema
661
662 schema = {
663 'doc': 'A weather reading.',
664 'name': 'Weather',
665 'namespace': 'test',
666 'type': 'record',
667 'fields': [
668 {'name': 'station', 'type': 'string'},
669 {'name': 'time', 'type': 'long'},
670 {'name': 'temp', 'type': 'int'},
671 ],
672 }
673 parsed_schema = parse_schema(schema)
674
675 records = [
676 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
677 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
678 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
679 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
680 ]
681
682 with open('weather.avro', 'wb') as out:
683 writer(out, parsed_schema, records)
684
685 The `fo` argument is a file-like object so another common example usage
686 would use an `io.BytesIO` object like so::
687
688 from io import BytesIO
689 from fastavro import writer
690
691 fo = BytesIO()
692 writer(fo, schema, records)
693
694 Given an existing avro file, it's possible to append to it by re-opening
695 the file in `a+b` mode. If the file is only opened in `ab` mode, we aren't
696 able to read some of the existing header information and an error will be
697 raised. For example::
698
699 # Write initial records
700 with open('weather.avro', 'wb') as out:
701 writer(out, parsed_schema, records)
702
703 # Write some more records
704 with open('weather.avro', 'a+b') as out:
705 writer(out, None, more_records)
706
707 Note: When appending, any schema provided will be ignored since the schema
708 in the avro file will be re-used. Therefore it is convenient to just use
709 None as the schema.
710 """
711 # Sanity check that records is not a single dictionary (as that is a common
712 # mistake and the exception that gets raised is not helpful)
713 if isinstance(records, dict):
714 raise ValueError('"records" argument should be an iterable, not dict')
715
716 output: Union[JSONWriter, Writer]
717 if isinstance(fo, AvroJSONEncoder):
718 output = JSONWriter(
719 fo,
720 schema,
721 codec,
722 sync_interval,
723 metadata,
724 validator,
725 sync_marker,
726 codec_compression_level,
727 options={
728 "strict": strict,
729 "strict_allow_default": strict_allow_default,
730 "disable_tuple_notation": disable_tuple_notation,
731 },
732 )
733 else:
734 output = Writer(
735 BinaryEncoder(fo),
736 schema,
737 codec,
738 sync_interval,
739 metadata,
740 validator,
741 sync_marker,
742 codec_compression_level,
743 options={
744 "strict": strict,
745 "strict_allow_default": strict_allow_default,
746 "disable_tuple_notation": disable_tuple_notation,
747 },
748 )
749
750 for record in records:
751 output.write(record)
752 output.flush()
753
754
755def schemaless_writer(
756 fo: IO,
757 schema: Schema,
758 record: Any,
759 *,
760 strict: bool = False,
761 strict_allow_default: bool = False,
762 disable_tuple_notation: bool = False,
763):
764 """Write a single record without the schema or header information
765
766 Parameters
767 ----------
768 fo
769 Output file
770 schema
771 Schema
772 record
773 Record to write
774 strict
775 If set to True, an error will be raised if records do not contain
776 exactly the same fields that the schema states
777 strict_allow_default
778 If set to True, an error will be raised if records do not contain
779 exactly the same fields that the schema states unless it is a missing
780 field that has a default value in the schema
781 disable_tuple_notation
782 If set to True, tuples will not be treated as a special case. Therefore,
783 using a tuple to indicate the type of a record will not work
784
785
786 Example::
787
788 parsed_schema = fastavro.parse_schema(schema)
789 with open('file', 'wb') as fp:
790 fastavro.schemaless_writer(fp, parsed_schema, record)
791
792 Note: The ``schemaless_writer`` can only write a single record.
793 """
794 named_schemas: NamedSchemas = {}
795 schema = parse_schema(schema, named_schemas)
796
797 encoder = BinaryEncoder(fo)
798 write_data(
799 encoder,
800 record,
801 schema,
802 named_schemas,
803 "",
804 {
805 "strict": strict,
806 "strict_allow_default": strict_allow_default,
807 "disable_tuple_notation": disable_tuple_notation,
808 },
809 )
810 encoder.flush()