Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastavro/_write_py.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Python code for writing AVRO files"""
3# This code is a modified version of the code at
4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under
5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0)
7from abc import ABC, abstractmethod
8import json
9from io import BytesIO
10from os import urandom, SEEK_SET
11import bz2
12import lzma
13import zlib
14from typing import Union, IO, Iterable, Any, Optional, Dict
15from warnings import warn
17from .const import NAMED_TYPES
18from .io.binary_encoder import BinaryEncoder
19from .io.json_encoder import AvroJSONEncoder
20from .validation import _validate
21from .read import HEADER_SCHEMA, SYNC_SIZE, MAGIC, reader
22from .logical_writers import LOGICAL_WRITERS
23from .schema import extract_record_type, extract_logical_type, parse_schema
24from ._write_common import _is_appendable
25from .types import Schema, NamedSchemas
28def write_null(encoder, datum, schema, named_schemas, fname, options):
29 """null is written as zero bytes"""
30 encoder.write_null()
33def write_boolean(encoder, datum, schema, named_schemas, fname, options):
34 """A boolean is written as a single byte whose value is either 0 (false) or
35 1 (true)."""
36 encoder.write_boolean(datum)
39def write_int(encoder, datum, schema, named_schemas, fname, options):
40 """int and long values are written using variable-length, zig-zag coding."""
41 encoder.write_int(datum)
44def write_long(encoder, datum, schema, named_schemas, fname, options):
45 """int and long values are written using variable-length, zig-zag coding."""
46 encoder.write_long(datum)
49def write_float(encoder, datum, schema, named_schemas, fname, options):
50 """A float is written as 4 bytes. The float is converted into a 32-bit
51 integer using a method equivalent to Java's floatToIntBits and then encoded
52 in little-endian format."""
53 encoder.write_float(datum)
56def write_double(encoder, datum, schema, named_schemas, fname, options):
57 """A double is written as 8 bytes. The double is converted into a 64-bit
58 integer using a method equivalent to Java's doubleToLongBits and then
59 encoded in little-endian format."""
60 encoder.write_double(datum)
63def write_bytes(encoder, datum, schema, named_schemas, fname, options):
64 """Bytes are encoded as a long followed by that many bytes of data."""
65 encoder.write_bytes(datum)
68def write_utf8(encoder, datum, schema, named_schemas, fname, options):
69 """A string is encoded as a long followed by that many bytes of UTF-8
70 encoded character data."""
71 encoder.write_utf8(datum)
74def write_crc32(encoder, datum):
75 """A 4-byte, big-endian CRC32 checksum"""
76 encoder.write_crc32(datum)
79def write_fixed(encoder, datum, schema, named_schemas, fname, options):
80 """Fixed instances are encoded using the number of bytes declared in the
81 schema."""
82 if len(datum) != schema["size"]:
83 raise ValueError(
84 f"data of length {len(datum)} does not match schema size: {schema}"
85 )
86 encoder.write_fixed(datum)
89def write_enum(encoder, datum, schema, named_schemas, fname, options):
90 """An enum is encoded by a int, representing the zero-based position of
91 the symbol in the schema."""
92 index = schema["symbols"].index(datum)
93 encoder.write_enum(index)
96def write_array(encoder, datum, schema, named_schemas, fname, options):
97 """Arrays are encoded as a series of blocks.
99 Each block consists of a long count value, followed by that many array
100 items. A block with count zero indicates the end of the array. Each item
101 is encoded per the array's item schema.
103 If a block's count is negative, then the count is followed immediately by a
104 long block size, indicating the number of bytes in the block. The actual
105 count in this case is the absolute value of the count written."""
106 encoder.write_array_start()
107 if len(datum) > 0:
108 encoder.write_item_count(len(datum))
109 dtype = schema["items"]
110 for item in datum:
111 write_data(encoder, item, dtype, named_schemas, fname, options)
112 encoder.end_item()
113 encoder.write_array_end()
116def write_map(encoder, datum, schema, named_schemas, fname, options):
117 """Maps are encoded as a series of blocks.
119 Each block consists of a long count value, followed by that many key/value
120 pairs. A block with count zero indicates the end of the map. Each item is
121 encoded per the map's value schema.
123 If a block's count is negative, then the count is followed immediately by a
124 long block size, indicating the number of bytes in the block. The actual
125 count in this case is the absolute value of the count written."""
126 encoder.write_map_start()
127 if len(datum) > 0:
128 encoder.write_item_count(len(datum))
129 vtype = schema["values"]
130 for key, val in datum.items():
131 encoder.write_utf8(key)
132 write_data(encoder, val, vtype, named_schemas, fname, options)
133 encoder.write_map_end()
136def write_union(encoder, datum, schema, named_schemas, fname, options):
137 """A union is encoded by first writing a long value indicating the
138 zero-based position within the union of the schema of its value. The value
139 is then encoded per the indicated schema within the union."""
141 best_match_index = -1
142 if isinstance(datum, tuple) and not options.get("disable_tuple_notation"):
143 (name, datum) = datum
144 for index, candidate in enumerate(schema):
145 extracted_type = extract_record_type(candidate)
146 if extracted_type in NAMED_TYPES:
147 schema_name = candidate["name"]
148 else:
149 schema_name = extracted_type
150 if name == schema_name:
151 best_match_index = index
152 break
154 if best_match_index == -1:
155 field = f"on field {fname}" if fname else ""
156 msg = (
157 f"provided union type name {name} not found in schema "
158 + f"{schema} {field}"
159 )
160 raise ValueError(msg)
161 index = best_match_index
162 else:
163 pytype = type(datum)
164 most_fields = -1
166 # All of Python's floating point values are doubles, so to
167 # avoid loss of precision, we should always prefer 'double'
168 # if we are forced to choose between float and double.
169 #
170 # If 'double' comes before 'float' in the union, then we'll immediately
171 # choose it, and don't need to worry. But if 'float' comes before
172 # 'double', we don't want to pick it.
173 #
174 # So, if we ever see 'float', we skim through the rest of the options,
175 # just to see if 'double' is a possibility, because we'd prefer it.
176 could_be_float = False
178 for index, candidate in enumerate(schema):
179 if could_be_float:
180 if extract_record_type(candidate) == "double":
181 best_match_index = index
182 break
183 else:
184 # Nothing except "double" is even worth considering.
185 continue
187 if _validate(
188 datum,
189 candidate,
190 named_schemas,
191 raise_errors=False,
192 field="",
193 options=options,
194 ):
195 record_type = extract_record_type(candidate)
196 if record_type == "record":
197 logical_type = extract_logical_type(candidate)
198 if logical_type:
199 prepare = LOGICAL_WRITERS.get(logical_type)
200 if prepare:
201 datum = prepare(datum, candidate)
203 candidate_fields = set(f["name"] for f in candidate["fields"])
204 datum_fields = set(datum)
205 fields = len(candidate_fields.intersection(datum_fields))
206 if fields > most_fields:
207 best_match_index = index
208 most_fields = fields
209 elif record_type == "float":
210 best_match_index = index
211 # Continue in the loop, because it's possible that there's
212 # another candidate which has record type 'double'
213 could_be_float = True
214 else:
215 best_match_index = index
216 break
217 if best_match_index == -1:
218 field = f"on field {fname}" if fname else ""
219 raise ValueError(
220 f"{repr(datum)} (type {pytype}) do not match {schema} {field}"
221 )
222 index = best_match_index
224 # write data
225 # TODO: There should be a way to give just the index
226 encoder.write_index(index, schema[index])
227 write_data(encoder, datum, schema[index], named_schemas, fname, options)
230def write_record(encoder, datum, schema, named_schemas, fname, options):
231 """A record is encoded by encoding the values of its fields in the order
232 that they are declared. In other words, a record is encoded as just the
233 concatenation of the encodings of its fields. Field values are encoded per
234 their schema."""
235 extras = set(datum) - set(field["name"] for field in schema["fields"])
236 if (options.get("strict") or options.get("strict_allow_default")) and extras:
237 raise ValueError(
238 f'record contains more fields than the schema specifies: {", ".join(extras)}'
239 )
240 for field in schema["fields"]:
241 name = field["name"]
242 field_type = field["type"]
243 if name not in datum:
244 if options.get("strict") or (
245 options.get("strict_allow_default") and "default" not in field
246 ):
247 raise ValueError(
248 f"Field {name} is specified in the schema but missing from the record"
249 )
250 elif "default" not in field and "null" not in field_type:
251 raise ValueError(f"no value and no default for {name}")
252 datum_value = datum.get(name, field.get("default"))
253 if field_type == "float" or field_type == "double":
254 # Handle float values like "NaN"
255 datum_value = float(datum_value)
256 write_data(
257 encoder,
258 datum_value,
259 field_type,
260 named_schemas,
261 name,
262 options,
263 )
266WRITERS = {
267 "null": write_null,
268 "boolean": write_boolean,
269 "string": write_utf8,
270 "int": write_int,
271 "long": write_long,
272 "float": write_float,
273 "double": write_double,
274 "bytes": write_bytes,
275 "fixed": write_fixed,
276 "enum": write_enum,
277 "array": write_array,
278 "map": write_map,
279 "union": write_union,
280 "error_union": write_union,
281 "record": write_record,
282 "error": write_record,
283}
286def write_data(encoder, datum, schema, named_schemas, fname, options):
287 """Write a datum of data to output stream.
289 Parameters
290 ----------
291 encoder: encoder
292 Type of encoder (e.g. binary or json)
293 datum: object
294 Data to write
295 schema: dict
296 Schema to use
297 named_schemas: dict
298 Mapping of fullname to schema definition
299 """
301 record_type = extract_record_type(schema)
302 logical_type = extract_logical_type(schema)
304 fn = WRITERS.get(record_type)
305 if fn:
306 if logical_type:
307 prepare = LOGICAL_WRITERS.get(logical_type)
308 if prepare:
309 datum = prepare(datum, schema)
310 try:
311 return fn(encoder, datum, schema, named_schemas, fname, options)
312 except TypeError as ex:
313 if fname:
314 raise TypeError(f"{ex} on field {fname}")
315 raise
316 else:
317 return write_data(
318 encoder, datum, named_schemas[record_type], named_schemas, "", options
319 )
322def write_header(encoder, metadata, sync_marker):
323 header = {
324 "magic": MAGIC,
325 "meta": {key: value.encode() for key, value in metadata.items()},
326 "sync": sync_marker,
327 }
328 write_data(encoder, header, HEADER_SCHEMA, {}, "", {})
331def null_write_block(encoder, block_bytes, compression_level):
332 """Write block in "null" codec."""
333 encoder.write_long(len(block_bytes))
334 encoder._fo.write(block_bytes)
337def deflate_write_block(encoder, block_bytes, compression_level):
338 """Write block in "deflate" codec."""
339 # The first two characters and last character are zlib
340 # wrappers around deflate data.
341 if compression_level is not None:
342 data = zlib.compress(block_bytes, compression_level)[2:-1]
343 else:
344 data = zlib.compress(block_bytes)[2:-1]
345 encoder.write_long(len(data))
346 encoder._fo.write(data)
349def bzip2_write_block(encoder, block_bytes, compression_level):
350 """Write block in "bzip2" codec."""
351 data = bz2.compress(block_bytes)
352 encoder.write_long(len(data))
353 encoder._fo.write(data)
356def xz_write_block(encoder, block_bytes, compression_level):
357 """Write block in "xz" codec."""
358 data = lzma.compress(block_bytes)
359 encoder.write_long(len(data))
360 encoder._fo.write(data)
363BLOCK_WRITERS = {
364 "null": null_write_block,
365 "deflate": deflate_write_block,
366 "bzip2": bzip2_write_block,
367 "xz": xz_write_block,
368}
371def _missing_codec_lib(codec, *libraries):
372 def missing(encoder, block_bytes, compression_level):
373 raise ValueError(
374 f"{codec} codec is supported but you need to install one of the "
375 + f"following libraries: {libraries}"
376 )
378 return missing
381def snappy_write_block(encoder, block_bytes, compression_level):
382 """Write block in "snappy" codec."""
383 data = snappy_compress(block_bytes)
384 encoder.write_long(len(data) + 4) # for CRC
385 encoder._fo.write(data)
386 encoder.write_crc32(block_bytes)
389try:
390 from cramjam import snappy
392 snappy_compress = snappy.compress_raw
393except ImportError:
394 try:
395 import snappy
397 snappy_compress = snappy.compress
398 warn(
399 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed",
400 DeprecationWarning,
401 )
402 except ImportError:
403 BLOCK_WRITERS["snappy"] = _missing_codec_lib("snappy", "cramjam")
404 else:
405 BLOCK_WRITERS["snappy"] = snappy_write_block
406else:
407 BLOCK_WRITERS["snappy"] = snappy_write_block
410def zstandard_write_block(encoder, block_bytes, compression_level):
411 """Write block in "zstandard" codec."""
412 if compression_level is not None:
413 data = zstandard.ZstdCompressor(level=compression_level).compress(block_bytes)
414 else:
415 data = zstandard.ZstdCompressor().compress(block_bytes)
416 encoder.write_long(len(data))
417 encoder._fo.write(data)
420try:
421 import zstandard
422except ImportError:
423 BLOCK_WRITERS["zstandard"] = _missing_codec_lib("zstandard", "zstandard")
424else:
425 BLOCK_WRITERS["zstandard"] = zstandard_write_block
428def lz4_write_block(encoder, block_bytes, compression_level):
429 """Write block in "lz4" codec."""
430 data = lz4.block.compress(block_bytes)
431 encoder.write_long(len(data))
432 encoder._fo.write(data)
435try:
436 import lz4.block
437except ImportError:
438 BLOCK_WRITERS["lz4"] = _missing_codec_lib("lz4", "lz4")
439else:
440 BLOCK_WRITERS["lz4"] = lz4_write_block
443class GenericWriter(ABC):
444 def __init__(self, schema, metadata=None, validator=None, options={}):
445 self._named_schemas = {}
446 self.validate_fn = _validate if validator else None
447 self.metadata = metadata or {}
448 self.options = options
450 # A schema of None is allowed when appending and when doing so the
451 # self.schema will be updated later
452 if schema is not None:
453 self.schema = parse_schema(schema, self._named_schemas)
455 if isinstance(schema, dict):
456 schema = {
457 key: value
458 for key, value in schema.items()
459 if key not in ("__fastavro_parsed", "__named_schemas")
460 }
461 elif isinstance(schema, list):
462 schemas = []
463 for s in schema:
464 if isinstance(s, dict):
465 schemas.append(
466 {
467 key: value
468 for key, value in s.items()
469 if key
470 not in (
471 "__fastavro_parsed",
472 "__named_schemas",
473 )
474 }
475 )
476 else:
477 schemas.append(s)
478 schema = schemas
480 self.metadata["avro.schema"] = json.dumps(schema)
482 @abstractmethod
483 def write(self, record):
484 pass
486 @abstractmethod
487 def flush(self):
488 pass
491class Writer(GenericWriter):
492 def __init__(
493 self,
494 fo: Union[IO, BinaryEncoder],
495 schema: Schema,
496 codec: str = "null",
497 sync_interval: int = 1000 * SYNC_SIZE,
498 metadata: Optional[Dict[str, str]] = None,
499 validator: bool = False,
500 sync_marker: bytes = b"",
501 compression_level: Optional[int] = None,
502 options: Dict[str, bool] = {},
503 ):
504 super().__init__(schema, metadata, validator, options)
506 self.metadata["avro.codec"] = codec
507 if isinstance(fo, BinaryEncoder):
508 self.encoder = fo
509 else:
510 self.encoder = BinaryEncoder(fo)
511 self.io = BinaryEncoder(BytesIO())
512 self.block_count = 0
513 self.sync_interval = sync_interval
514 self.compression_level = compression_level
516 if _is_appendable(self.encoder._fo):
517 # Seed to the beginning to read the header
518 self.encoder._fo.seek(0)
519 avro_reader = reader(self.encoder._fo)
520 header = avro_reader._header
522 self._named_schemas = {}
523 self.schema = parse_schema(avro_reader.writer_schema, self._named_schemas)
525 codec = avro_reader.metadata.get("avro.codec", "null")
527 self.sync_marker = header["sync"]
529 # Seek to the end of the file
530 self.encoder._fo.seek(0, 2)
532 self.block_writer = BLOCK_WRITERS[codec]
533 else:
534 self.sync_marker = sync_marker or urandom(SYNC_SIZE)
536 try:
537 self.block_writer = BLOCK_WRITERS[codec]
538 except KeyError:
539 raise ValueError(f"unrecognized codec: {codec}")
541 write_header(self.encoder, self.metadata, self.sync_marker)
543 def dump(self):
544 self.encoder.write_long(self.block_count)
545 self.block_writer(self.encoder, self.io._fo.getvalue(), self.compression_level)
546 self.encoder._fo.write(self.sync_marker)
547 self.io._fo.truncate(0)
548 self.io._fo.seek(0, SEEK_SET)
549 self.block_count = 0
551 def write(self, record):
552 if self.validate_fn:
553 self.validate_fn(
554 record, self.schema, self._named_schemas, "", True, self.options
555 )
556 write_data(self.io, record, self.schema, self._named_schemas, "", self.options)
557 self.block_count += 1
558 if self.io._fo.tell() >= self.sync_interval:
559 self.dump()
561 def write_block(self, block):
562 # Clear existing block if there are any records pending
563 if self.io._fo.tell() or self.block_count > 0:
564 self.dump()
565 self.encoder.write_long(block.num_records)
566 self.block_writer(self.encoder, block.bytes_.getvalue(), self.compression_level)
567 self.encoder._fo.write(self.sync_marker)
569 def flush(self):
570 if self.io._fo.tell() or self.block_count > 0:
571 self.dump()
572 self.encoder._fo.flush()
575class JSONWriter(GenericWriter):
576 def __init__(
577 self,
578 fo: AvroJSONEncoder,
579 schema: Schema,
580 codec: str = "null",
581 sync_interval: int = 1000 * SYNC_SIZE,
582 metadata: Optional[Dict[str, str]] = None,
583 validator: bool = False,
584 sync_marker: bytes = b"",
585 codec_compression_level: Optional[int] = None,
586 options: Dict[str, bool] = {},
587 ):
588 super().__init__(schema, metadata, validator, options)
590 self.encoder = fo
591 self.encoder.configure(self.schema, self._named_schemas)
593 def write(self, record):
594 if self.validate_fn:
595 self.validate_fn(
596 record, self.schema, self._named_schemas, "", True, self.options
597 )
598 write_data(
599 self.encoder, record, self.schema, self._named_schemas, "", self.options
600 )
602 def flush(self):
603 self.encoder.flush()
606def writer(
607 fo: Union[IO, AvroJSONEncoder],
608 schema: Schema,
609 records: Iterable[Any],
610 codec: str = "null",
611 sync_interval: int = 1000 * SYNC_SIZE,
612 metadata: Optional[Dict[str, str]] = None,
613 validator: bool = False,
614 sync_marker: bytes = b"",
615 codec_compression_level: Optional[int] = None,
616 *,
617 strict: bool = False,
618 strict_allow_default: bool = False,
619 disable_tuple_notation: bool = False,
620):
621 """Write records to fo (stream) according to schema
623 Parameters
624 ----------
625 fo
626 Output stream
627 schema
628 Writer schema
629 records
630 Records to write. This is commonly a list of the dictionary
631 representation of the records, but it can be any iterable
632 codec
633 Compression codec, can be 'null', 'deflate' or 'snappy' (if installed)
634 sync_interval
635 Size of sync interval
636 metadata
637 Header metadata
638 validator
639 If true, validation will be done on the records
640 sync_marker
641 A byte string used as the avro sync marker. If not provided, a random
642 byte string will be used.
643 codec_compression_level
644 Compression level to use with the specified codec (if the codec
645 supports it)
646 strict
647 If set to True, an error will be raised if records do not contain
648 exactly the same fields that the schema states
649 strict_allow_default
650 If set to True, an error will be raised if records do not contain
651 exactly the same fields that the schema states unless it is a missing
652 field that has a default value in the schema
653 disable_tuple_notation
654 If set to True, tuples will not be treated as a special case. Therefore,
655 using a tuple to indicate the type of a record will not work
658 Example::
660 from fastavro import writer, parse_schema
662 schema = {
663 'doc': 'A weather reading.',
664 'name': 'Weather',
665 'namespace': 'test',
666 'type': 'record',
667 'fields': [
668 {'name': 'station', 'type': 'string'},
669 {'name': 'time', 'type': 'long'},
670 {'name': 'temp', 'type': 'int'},
671 ],
672 }
673 parsed_schema = parse_schema(schema)
675 records = [
676 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
677 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
678 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
679 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
680 ]
682 with open('weather.avro', 'wb') as out:
683 writer(out, parsed_schema, records)
685 The `fo` argument is a file-like object so another common example usage
686 would use an `io.BytesIO` object like so::
688 from io import BytesIO
689 from fastavro import writer
691 fo = BytesIO()
692 writer(fo, schema, records)
694 Given an existing avro file, it's possible to append to it by re-opening
695 the file in `a+b` mode. If the file is only opened in `ab` mode, we aren't
696 able to read some of the existing header information and an error will be
697 raised. For example::
699 # Write initial records
700 with open('weather.avro', 'wb') as out:
701 writer(out, parsed_schema, records)
703 # Write some more records
704 with open('weather.avro', 'a+b') as out:
705 writer(out, None, more_records)
707 Note: When appending, any schema provided will be ignored since the schema
708 in the avro file will be re-used. Therefore it is convenient to just use
709 None as the schema.
710 """
711 # Sanity check that records is not a single dictionary (as that is a common
712 # mistake and the exception that gets raised is not helpful)
713 if isinstance(records, dict):
714 raise ValueError('"records" argument should be an iterable, not dict')
716 output: Union[JSONWriter, Writer]
717 if isinstance(fo, AvroJSONEncoder):
718 output = JSONWriter(
719 fo,
720 schema,
721 codec,
722 sync_interval,
723 metadata,
724 validator,
725 sync_marker,
726 codec_compression_level,
727 options={
728 "strict": strict,
729 "strict_allow_default": strict_allow_default,
730 "disable_tuple_notation": disable_tuple_notation,
731 },
732 )
733 else:
734 output = Writer(
735 BinaryEncoder(fo),
736 schema,
737 codec,
738 sync_interval,
739 metadata,
740 validator,
741 sync_marker,
742 codec_compression_level,
743 options={
744 "strict": strict,
745 "strict_allow_default": strict_allow_default,
746 "disable_tuple_notation": disable_tuple_notation,
747 },
748 )
750 for record in records:
751 output.write(record)
752 output.flush()
755def schemaless_writer(
756 fo: IO,
757 schema: Schema,
758 record: Any,
759 *,
760 strict: bool = False,
761 strict_allow_default: bool = False,
762 disable_tuple_notation: bool = False,
763):
764 """Write a single record without the schema or header information
766 Parameters
767 ----------
768 fo
769 Output file
770 schema
771 Schema
772 record
773 Record to write
774 strict
775 If set to True, an error will be raised if records do not contain
776 exactly the same fields that the schema states
777 strict_allow_default
778 If set to True, an error will be raised if records do not contain
779 exactly the same fields that the schema states unless it is a missing
780 field that has a default value in the schema
781 disable_tuple_notation
782 If set to True, tuples will not be treated as a special case. Therefore,
783 using a tuple to indicate the type of a record will not work
786 Example::
788 parsed_schema = fastavro.parse_schema(schema)
789 with open('file', 'wb') as fp:
790 fastavro.schemaless_writer(fp, parsed_schema, record)
792 Note: The ``schemaless_writer`` can only write a single record.
793 """
794 named_schemas: NamedSchemas = {}
795 schema = parse_schema(schema, named_schemas)
797 encoder = BinaryEncoder(fo)
798 write_data(
799 encoder,
800 record,
801 schema,
802 named_schemas,
803 "",
804 {
805 "strict": strict,
806 "strict_allow_default": strict_allow_default,
807 "disable_tuple_notation": disable_tuple_notation,
808 },
809 )
810 encoder.flush()