Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fastavro/_write_py.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

308 statements  

1"""Python code for writing AVRO files""" 

2 

3# This code is a modified version of the code at 

4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under 

5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0) 

6 

7from abc import ABC, abstractmethod 

8import json 

9from io import BytesIO 

10from os import urandom, SEEK_SET 

11import bz2 

12import lzma 

13import zlib 

14from typing import Union, IO, Iterable, Any, Optional, Dict 

15from warnings import warn 

16 

17from .const import NAMED_TYPES 

18from .io.binary_encoder import BinaryEncoder 

19from .io.json_encoder import AvroJSONEncoder 

20from .validation import _validate 

21from .read import HEADER_SCHEMA, SYNC_SIZE, MAGIC, reader 

22from .logical_writers import LOGICAL_WRITERS 

23from .schema import extract_record_type, extract_logical_type, parse_schema 

24from ._write_common import _is_appendable 

25from .types import Schema, NamedSchemas 

26 

27 

28def write_null(encoder, datum, schema, named_schemas, fname, options): 

29 """null is written as zero bytes""" 

30 encoder.write_null() 

31 

32 

33def write_boolean(encoder, datum, schema, named_schemas, fname, options): 

34 """A boolean is written as a single byte whose value is either 0 (false) or 

35 1 (true).""" 

36 encoder.write_boolean(datum) 

37 

38 

39def write_int(encoder, datum, schema, named_schemas, fname, options): 

40 """int and long values are written using variable-length, zig-zag coding.""" 

41 encoder.write_int(datum) 

42 

43 

44def write_long(encoder, datum, schema, named_schemas, fname, options): 

45 """int and long values are written using variable-length, zig-zag coding.""" 

46 encoder.write_long(datum) 

47 

48 

49def write_float(encoder, datum, schema, named_schemas, fname, options): 

50 """A float is written as 4 bytes. The float is converted into a 32-bit 

51 integer using a method equivalent to Java's floatToIntBits and then encoded 

52 in little-endian format.""" 

53 encoder.write_float(datum) 

54 

55 

56def write_double(encoder, datum, schema, named_schemas, fname, options): 

57 """A double is written as 8 bytes. The double is converted into a 64-bit 

58 integer using a method equivalent to Java's doubleToLongBits and then 

59 encoded in little-endian format.""" 

60 encoder.write_double(datum) 

61 

62 

63def write_bytes(encoder, datum, schema, named_schemas, fname, options): 

64 """Bytes are encoded as a long followed by that many bytes of data.""" 

65 encoder.write_bytes(datum) 

66 

67 

68def write_utf8(encoder, datum, schema, named_schemas, fname, options): 

69 """A string is encoded as a long followed by that many bytes of UTF-8 

70 encoded character data.""" 

71 encoder.write_utf8(datum) 

72 

73 

74def write_crc32(encoder, datum): 

75 """A 4-byte, big-endian CRC32 checksum""" 

76 encoder.write_crc32(datum) 

77 

78 

79def write_fixed(encoder, datum, schema, named_schemas, fname, options): 

80 """Fixed instances are encoded using the number of bytes declared in the 

81 schema.""" 

82 if len(datum) != schema["size"]: 

83 raise ValueError( 

84 f"data of length {len(datum)} does not match schema size: {schema}" 

85 ) 

86 encoder.write_fixed(datum) 

87 

88 

89def write_enum(encoder, datum, schema, named_schemas, fname, options): 

90 """An enum is encoded by a int, representing the zero-based position of 

91 the symbol in the schema.""" 

92 index = schema["symbols"].index(datum) 

93 encoder.write_enum(index) 

94 

95 

96def write_array(encoder, datum, schema, named_schemas, fname, options): 

97 """Arrays are encoded as a series of blocks. 

98 

99 Each block consists of a long count value, followed by that many array 

100 items. A block with count zero indicates the end of the array. Each item 

101 is encoded per the array's item schema. 

102 

103 If a block's count is negative, then the count is followed immediately by a 

104 long block size, indicating the number of bytes in the block. The actual 

105 count in this case is the absolute value of the count written.""" 

106 encoder.write_array_start() 

107 if len(datum) > 0: 

108 encoder.write_item_count(len(datum)) 

109 dtype = schema["items"] 

110 for item in datum: 

111 write_data(encoder, item, dtype, named_schemas, fname, options) 

112 encoder.end_item() 

113 encoder.write_array_end() 

114 

115 

116def write_map(encoder, datum, schema, named_schemas, fname, options): 

117 """Maps are encoded as a series of blocks. 

118 

119 Each block consists of a long count value, followed by that many key/value 

120 pairs. A block with count zero indicates the end of the map. Each item is 

121 encoded per the map's value schema. 

122 

123 If a block's count is negative, then the count is followed immediately by a 

124 long block size, indicating the number of bytes in the block. The actual 

125 count in this case is the absolute value of the count written.""" 

126 encoder.write_map_start() 

127 if len(datum) > 0: 

128 encoder.write_item_count(len(datum)) 

129 vtype = schema["values"] 

130 for key, val in datum.items(): 

131 encoder.write_utf8(key) 

132 write_data(encoder, val, vtype, named_schemas, fname, options) 

133 encoder.write_map_end() 

134 

135 

136def write_union(encoder, datum, schema, named_schemas, fname, options): 

137 """A union is encoded by first writing a long value indicating the 

138 zero-based position within the union of the schema of its value. The value 

139 is then encoded per the indicated schema within the union.""" 

140 

141 best_match_index = -1 

142 if isinstance(datum, tuple) and not options.get("disable_tuple_notation"): 

143 (name, datum) = datum 

144 for index, candidate in enumerate(schema): 

145 extracted_type = extract_record_type(candidate) 

146 if extracted_type in NAMED_TYPES: 

147 schema_name = candidate["name"] 

148 else: 

149 schema_name = extracted_type 

150 if name == schema_name: 

151 best_match_index = index 

152 break 

153 

154 if best_match_index == -1: 

155 field = f"on field {fname}" if fname else "" 

156 msg = ( 

157 f"provided union type name {name} not found in schema " 

158 + f"{schema} {field}" 

159 ) 

160 raise ValueError(msg) 

161 index = best_match_index 

162 else: 

163 pytype = type(datum) 

164 most_fields = -1 

165 

166 # All of Python's floating point values are doubles, so to 

167 # avoid loss of precision, we should always prefer 'double' 

168 # if we are forced to choose between float and double. 

169 # 

170 # If 'double' comes before 'float' in the union, then we'll immediately 

171 # choose it, and don't need to worry. But if 'float' comes before 

172 # 'double', we don't want to pick it. 

173 # 

174 # So, if we ever see 'float', we skim through the rest of the options, 

175 # just to see if 'double' is a possibility, because we'd prefer it. 

176 could_be_float = False 

177 

178 for index, candidate in enumerate(schema): 

179 if could_be_float: 

180 if extract_record_type(candidate) == "double": 

181 best_match_index = index 

182 break 

183 else: 

184 # Nothing except "double" is even worth considering. 

185 continue 

186 

187 if _validate( 

188 datum, 

189 candidate, 

190 named_schemas, 

191 raise_errors=False, 

192 field="", 

193 options=options, 

194 ): 

195 record_type = extract_record_type(candidate) 

196 if record_type == "record": 

197 logical_type = extract_logical_type(candidate) 

198 if logical_type: 

199 prepare = LOGICAL_WRITERS.get(logical_type) 

200 if prepare: 

201 datum = prepare(datum, candidate) 

202 

203 candidate_fields = set(f["name"] for f in candidate["fields"]) 

204 datum_fields = set(datum) 

205 fields = len(candidate_fields.intersection(datum_fields)) 

206 if fields > most_fields: 

207 best_match_index = index 

208 most_fields = fields 

209 elif record_type == "float": 

210 best_match_index = index 

211 # Continue in the loop, because it's possible that there's 

212 # another candidate which has record type 'double' 

213 could_be_float = True 

214 else: 

215 best_match_index = index 

216 break 

217 if best_match_index == -1: 

218 field = f"on field {fname}" if fname else "" 

219 raise ValueError( 

220 f"{repr(datum)} (type {pytype}) do not match {schema} {field}" 

221 ) 

222 index = best_match_index 

223 

224 # write data 

225 # TODO: There should be a way to give just the index 

226 encoder.write_index(index, schema[index]) 

227 write_data(encoder, datum, schema[index], named_schemas, fname, options) 

228 

229 

230def write_record(encoder, datum, schema, named_schemas, fname, options): 

231 """A record is encoded by encoding the values of its fields in the order 

232 that they are declared. In other words, a record is encoded as just the 

233 concatenation of the encodings of its fields. Field values are encoded per 

234 their schema.""" 

235 extras = set(datum) - set(field["name"] for field in schema["fields"]) 

236 if (options.get("strict") or options.get("strict_allow_default")) and extras: 

237 raise ValueError( 

238 f'record contains more fields than the schema specifies: {", ".join(extras)}' 

239 ) 

240 for field in schema["fields"]: 

241 name = field["name"] 

242 field_type = field["type"] 

243 if name not in datum: 

244 if options.get("strict") or ( 

245 options.get("strict_allow_default") and "default" not in field 

246 ): 

247 raise ValueError( 

248 f"Field {name} is specified in the schema but missing from the record" 

249 ) 

250 elif "default" not in field and "null" not in field_type: 

251 raise ValueError(f"no value and no default for {name}") 

252 datum_value = datum.get(name, field.get("default")) 

253 if field_type == "float" or field_type == "double": 

254 # Handle float values like "NaN" 

255 datum_value = float(datum_value) 

256 write_data( 

257 encoder, 

258 datum_value, 

259 field_type, 

260 named_schemas, 

261 name, 

262 options, 

263 ) 

264 

265 

266WRITERS = { 

267 "null": write_null, 

268 "boolean": write_boolean, 

269 "string": write_utf8, 

270 "int": write_int, 

271 "long": write_long, 

272 "float": write_float, 

273 "double": write_double, 

274 "bytes": write_bytes, 

275 "fixed": write_fixed, 

276 "enum": write_enum, 

277 "array": write_array, 

278 "map": write_map, 

279 "union": write_union, 

280 "error_union": write_union, 

281 "record": write_record, 

282 "error": write_record, 

283} 

284 

285 

286def write_data(encoder, datum, schema, named_schemas, fname, options): 

287 """Write a datum of data to output stream. 

288 

289 Parameters 

290 ---------- 

291 encoder: encoder 

292 Type of encoder (e.g. binary or json) 

293 datum: object 

294 Data to write 

295 schema: dict 

296 Schema to use 

297 named_schemas: dict 

298 Mapping of fullname to schema definition 

299 """ 

300 

301 record_type = extract_record_type(schema) 

302 logical_type = extract_logical_type(schema) 

303 

304 fn = WRITERS.get(record_type) 

305 if fn: 

306 if logical_type: 

307 prepare = LOGICAL_WRITERS.get(logical_type) 

308 if prepare: 

309 datum = prepare(datum, schema) 

310 try: 

311 return fn(encoder, datum, schema, named_schemas, fname, options) 

312 except TypeError as ex: 

313 if fname: 

314 raise TypeError(f"{ex} on field {fname}") 

315 raise 

316 else: 

317 return write_data( 

318 encoder, datum, named_schemas[record_type], named_schemas, "", options 

319 ) 

320 

321 

322def write_header(encoder, metadata, sync_marker): 

323 header = { 

324 "magic": MAGIC, 

325 "meta": {key: value.encode() for key, value in metadata.items()}, 

326 "sync": sync_marker, 

327 } 

328 write_data(encoder, header, HEADER_SCHEMA, {}, "", {}) 

329 

330 

331def null_write_block(encoder, block_bytes, compression_level): 

332 """Write block in "null" codec.""" 

333 encoder.write_long(len(block_bytes)) 

334 encoder._fo.write(block_bytes) 

335 

336 

337def deflate_write_block(encoder, block_bytes, compression_level): 

338 """Write block in "deflate" codec.""" 

339 # The first two characters and last character are zlib 

340 # wrappers around deflate data. 

341 if compression_level is not None: 

342 data = zlib.compress(block_bytes, compression_level)[2:-1] 

343 else: 

344 data = zlib.compress(block_bytes)[2:-1] 

345 encoder.write_long(len(data)) 

346 encoder._fo.write(data) 

347 

348 

349def bzip2_write_block(encoder, block_bytes, compression_level): 

350 """Write block in "bzip2" codec.""" 

351 data = bz2.compress(block_bytes) 

352 encoder.write_long(len(data)) 

353 encoder._fo.write(data) 

354 

355 

356def xz_write_block(encoder, block_bytes, compression_level): 

357 """Write block in "xz" codec.""" 

358 data = lzma.compress(block_bytes) 

359 encoder.write_long(len(data)) 

360 encoder._fo.write(data) 

361 

362 

363BLOCK_WRITERS = { 

364 "null": null_write_block, 

365 "deflate": deflate_write_block, 

366 "bzip2": bzip2_write_block, 

367 "xz": xz_write_block, 

368} 

369 

370 

371def _missing_codec_lib(codec, *libraries): 

372 def missing(encoder, block_bytes, compression_level): 

373 raise ValueError( 

374 f"{codec} codec is supported but you need to install one of the " 

375 + f"following libraries: {libraries}" 

376 ) 

377 

378 return missing 

379 

380 

381def snappy_write_block(encoder, block_bytes, compression_level): 

382 """Write block in "snappy" codec.""" 

383 data = snappy_compress(block_bytes) 

384 encoder.write_long(len(data) + 4) # for CRC 

385 encoder._fo.write(data) 

386 encoder.write_crc32(block_bytes) 

387 

388 

389try: 

390 from cramjam import snappy 

391 

392 snappy_compress = snappy.compress_raw 

393except ImportError: 

394 try: 

395 import snappy 

396 

397 snappy_compress = snappy.compress 

398 warn( 

399 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed", 

400 DeprecationWarning, 

401 ) 

402 except ImportError: 

403 BLOCK_WRITERS["snappy"] = _missing_codec_lib("snappy", "cramjam") 

404 else: 

405 BLOCK_WRITERS["snappy"] = snappy_write_block 

406else: 

407 BLOCK_WRITERS["snappy"] = snappy_write_block 

408 

409 

410def zstandard_write_block(encoder, block_bytes, compression_level): 

411 """Write block in "zstandard" codec.""" 

412 if compression_level is not None: 

413 data = zstandard.ZstdCompressor(level=compression_level).compress(block_bytes) 

414 else: 

415 data = zstandard.ZstdCompressor().compress(block_bytes) 

416 encoder.write_long(len(data)) 

417 encoder._fo.write(data) 

418 

419 

420try: 

421 import zstandard 

422except ImportError: 

423 BLOCK_WRITERS["zstandard"] = _missing_codec_lib("zstandard", "zstandard") 

424else: 

425 BLOCK_WRITERS["zstandard"] = zstandard_write_block 

426 

427 

428def lz4_write_block(encoder, block_bytes, compression_level): 

429 """Write block in "lz4" codec.""" 

430 data = lz4.block.compress(block_bytes) 

431 encoder.write_long(len(data)) 

432 encoder._fo.write(data) 

433 

434 

435try: 

436 import lz4.block 

437except ImportError: 

438 BLOCK_WRITERS["lz4"] = _missing_codec_lib("lz4", "lz4") 

439else: 

440 BLOCK_WRITERS["lz4"] = lz4_write_block 

441 

442 

443class GenericWriter(ABC): 

444 def __init__(self, schema, metadata=None, validator=None, options={}): 

445 self._named_schemas = {} 

446 self.validate_fn = _validate if validator else None 

447 self.metadata = metadata or {} 

448 self.options = options 

449 

450 # A schema of None is allowed when appending and when doing so the 

451 # self.schema will be updated later 

452 if schema is not None: 

453 self.schema = parse_schema(schema, self._named_schemas) 

454 

455 if isinstance(schema, dict): 

456 schema = { 

457 key: value 

458 for key, value in schema.items() 

459 if key not in ("__fastavro_parsed", "__named_schemas") 

460 } 

461 elif isinstance(schema, list): 

462 schemas = [] 

463 for s in schema: 

464 if isinstance(s, dict): 

465 schemas.append( 

466 { 

467 key: value 

468 for key, value in s.items() 

469 if key 

470 not in ( 

471 "__fastavro_parsed", 

472 "__named_schemas", 

473 ) 

474 } 

475 ) 

476 else: 

477 schemas.append(s) 

478 schema = schemas 

479 

480 self.metadata["avro.schema"] = json.dumps(schema) 

481 

482 @abstractmethod 

483 def write(self, record): 

484 pass 

485 

486 @abstractmethod 

487 def flush(self): 

488 pass 

489 

490 

491class Writer(GenericWriter): 

492 def __init__( 

493 self, 

494 fo: Union[IO, BinaryEncoder], 

495 schema: Schema, 

496 codec: str = "null", 

497 sync_interval: int = 1000 * SYNC_SIZE, 

498 metadata: Optional[Dict[str, str]] = None, 

499 validator: bool = False, 

500 sync_marker: bytes = b"", 

501 compression_level: Optional[int] = None, 

502 options: Dict[str, bool] = {}, 

503 ): 

504 super().__init__(schema, metadata, validator, options) 

505 

506 self.metadata["avro.codec"] = codec 

507 if isinstance(fo, BinaryEncoder): 

508 self.encoder = fo 

509 else: 

510 self.encoder = BinaryEncoder(fo) 

511 self.io = BinaryEncoder(BytesIO()) 

512 self.block_count = 0 

513 self.sync_interval = sync_interval 

514 self.compression_level = compression_level 

515 

516 if _is_appendable(self.encoder._fo): 

517 # Seed to the beginning to read the header 

518 self.encoder._fo.seek(0) 

519 avro_reader = reader(self.encoder._fo) 

520 header = avro_reader._header 

521 

522 self._named_schemas = {} 

523 self.schema = parse_schema(avro_reader.writer_schema, self._named_schemas) 

524 

525 codec = avro_reader.metadata.get("avro.codec", "null") 

526 

527 self.sync_marker = header["sync"] 

528 

529 # Seek to the end of the file 

530 self.encoder._fo.seek(0, 2) 

531 

532 self.block_writer = BLOCK_WRITERS[codec] 

533 else: 

534 self.sync_marker = sync_marker or urandom(SYNC_SIZE) 

535 

536 try: 

537 self.block_writer = BLOCK_WRITERS[codec] 

538 except KeyError: 

539 raise ValueError(f"unrecognized codec: {codec}") 

540 

541 write_header(self.encoder, self.metadata, self.sync_marker) 

542 

543 def dump(self): 

544 self.encoder.write_long(self.block_count) 

545 self.block_writer(self.encoder, self.io._fo.getvalue(), self.compression_level) 

546 self.encoder._fo.write(self.sync_marker) 

547 self.io._fo.truncate(0) 

548 self.io._fo.seek(0, SEEK_SET) 

549 self.block_count = 0 

550 

551 def write(self, record): 

552 if self.validate_fn: 

553 self.validate_fn( 

554 record, self.schema, self._named_schemas, "", True, self.options 

555 ) 

556 write_data(self.io, record, self.schema, self._named_schemas, "", self.options) 

557 self.block_count += 1 

558 if self.io._fo.tell() >= self.sync_interval: 

559 self.dump() 

560 

561 def write_block(self, block): 

562 # Clear existing block if there are any records pending 

563 if self.io._fo.tell() or self.block_count > 0: 

564 self.dump() 

565 self.encoder.write_long(block.num_records) 

566 self.block_writer(self.encoder, block.bytes_.getvalue(), self.compression_level) 

567 self.encoder._fo.write(self.sync_marker) 

568 

569 def flush(self): 

570 if self.io._fo.tell() or self.block_count > 0: 

571 self.dump() 

572 self.encoder._fo.flush() 

573 

574 

575class JSONWriter(GenericWriter): 

576 def __init__( 

577 self, 

578 fo: AvroJSONEncoder, 

579 schema: Schema, 

580 codec: str = "null", 

581 sync_interval: int = 1000 * SYNC_SIZE, 

582 metadata: Optional[Dict[str, str]] = None, 

583 validator: bool = False, 

584 sync_marker: bytes = b"", 

585 codec_compression_level: Optional[int] = None, 

586 options: Dict[str, bool] = {}, 

587 ): 

588 super().__init__(schema, metadata, validator, options) 

589 

590 self.encoder = fo 

591 self.encoder.configure(self.schema, self._named_schemas) 

592 

593 def write(self, record): 

594 if self.validate_fn: 

595 self.validate_fn( 

596 record, self.schema, self._named_schemas, "", True, self.options 

597 ) 

598 write_data( 

599 self.encoder, record, self.schema, self._named_schemas, "", self.options 

600 ) 

601 

602 def flush(self): 

603 self.encoder.flush() 

604 

605 

606def writer( 

607 fo: Union[IO, AvroJSONEncoder], 

608 schema: Schema, 

609 records: Iterable[Any], 

610 codec: str = "null", 

611 sync_interval: int = 1000 * SYNC_SIZE, 

612 metadata: Optional[Dict[str, str]] = None, 

613 validator: bool = False, 

614 sync_marker: bytes = b"", 

615 codec_compression_level: Optional[int] = None, 

616 *, 

617 strict: bool = False, 

618 strict_allow_default: bool = False, 

619 disable_tuple_notation: bool = False, 

620): 

621 """Write records to fo (stream) according to schema 

622 

623 Parameters 

624 ---------- 

625 fo 

626 Output stream 

627 schema 

628 Writer schema 

629 records 

630 Records to write. This is commonly a list of the dictionary 

631 representation of the records, but it can be any iterable 

632 codec 

633 Compression codec, can be 'null', 'deflate' or 'snappy' (if installed) 

634 sync_interval 

635 Size of sync interval 

636 metadata 

637 Header metadata 

638 validator 

639 If true, validation will be done on the records 

640 sync_marker 

641 A byte string used as the avro sync marker. If not provided, a random 

642 byte string will be used. 

643 codec_compression_level 

644 Compression level to use with the specified codec (if the codec 

645 supports it) 

646 strict 

647 If set to True, an error will be raised if records do not contain 

648 exactly the same fields that the schema states 

649 strict_allow_default 

650 If set to True, an error will be raised if records do not contain 

651 exactly the same fields that the schema states unless it is a missing 

652 field that has a default value in the schema 

653 disable_tuple_notation 

654 If set to True, tuples will not be treated as a special case. Therefore, 

655 using a tuple to indicate the type of a record will not work 

656 

657 

658 Example:: 

659 

660 from fastavro import writer, parse_schema 

661 

662 schema = { 

663 'doc': 'A weather reading.', 

664 'name': 'Weather', 

665 'namespace': 'test', 

666 'type': 'record', 

667 'fields': [ 

668 {'name': 'station', 'type': 'string'}, 

669 {'name': 'time', 'type': 'long'}, 

670 {'name': 'temp', 'type': 'int'}, 

671 ], 

672 } 

673 parsed_schema = parse_schema(schema) 

674 

675 records = [ 

676 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388}, 

677 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389}, 

678 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379}, 

679 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478}, 

680 ] 

681 

682 with open('weather.avro', 'wb') as out: 

683 writer(out, parsed_schema, records) 

684 

685 The `fo` argument is a file-like object so another common example usage 

686 would use an `io.BytesIO` object like so:: 

687 

688 from io import BytesIO 

689 from fastavro import writer 

690 

691 fo = BytesIO() 

692 writer(fo, schema, records) 

693 

694 Given an existing avro file, it's possible to append to it by re-opening 

695 the file in `a+b` mode. If the file is only opened in `ab` mode, we aren't 

696 able to read some of the existing header information and an error will be 

697 raised. For example:: 

698 

699 # Write initial records 

700 with open('weather.avro', 'wb') as out: 

701 writer(out, parsed_schema, records) 

702 

703 # Write some more records 

704 with open('weather.avro', 'a+b') as out: 

705 writer(out, None, more_records) 

706 

707 Note: When appending, any schema provided will be ignored since the schema 

708 in the avro file will be re-used. Therefore it is convenient to just use 

709 None as the schema. 

710 """ 

711 # Sanity check that records is not a single dictionary (as that is a common 

712 # mistake and the exception that gets raised is not helpful) 

713 if isinstance(records, dict): 

714 raise ValueError('"records" argument should be an iterable, not dict') 

715 

716 output: Union[JSONWriter, Writer] 

717 if isinstance(fo, AvroJSONEncoder): 

718 output = JSONWriter( 

719 fo, 

720 schema, 

721 codec, 

722 sync_interval, 

723 metadata, 

724 validator, 

725 sync_marker, 

726 codec_compression_level, 

727 options={ 

728 "strict": strict, 

729 "strict_allow_default": strict_allow_default, 

730 "disable_tuple_notation": disable_tuple_notation, 

731 }, 

732 ) 

733 else: 

734 output = Writer( 

735 BinaryEncoder(fo), 

736 schema, 

737 codec, 

738 sync_interval, 

739 metadata, 

740 validator, 

741 sync_marker, 

742 codec_compression_level, 

743 options={ 

744 "strict": strict, 

745 "strict_allow_default": strict_allow_default, 

746 "disable_tuple_notation": disable_tuple_notation, 

747 }, 

748 ) 

749 

750 for record in records: 

751 output.write(record) 

752 output.flush() 

753 

754 

755def schemaless_writer( 

756 fo: IO, 

757 schema: Schema, 

758 record: Any, 

759 *, 

760 strict: bool = False, 

761 strict_allow_default: bool = False, 

762 disable_tuple_notation: bool = False, 

763): 

764 """Write a single record without the schema or header information 

765 

766 Parameters 

767 ---------- 

768 fo 

769 Output file 

770 schema 

771 Schema 

772 record 

773 Record to write 

774 strict 

775 If set to True, an error will be raised if records do not contain 

776 exactly the same fields that the schema states 

777 strict_allow_default 

778 If set to True, an error will be raised if records do not contain 

779 exactly the same fields that the schema states unless it is a missing 

780 field that has a default value in the schema 

781 disable_tuple_notation 

782 If set to True, tuples will not be treated as a special case. Therefore, 

783 using a tuple to indicate the type of a record will not work 

784 

785 

786 Example:: 

787 

788 parsed_schema = fastavro.parse_schema(schema) 

789 with open('file', 'wb') as fp: 

790 fastavro.schemaless_writer(fp, parsed_schema, record) 

791 

792 Note: The ``schemaless_writer`` can only write a single record. 

793 """ 

794 named_schemas: NamedSchemas = {} 

795 schema = parse_schema(schema, named_schemas) 

796 

797 encoder = BinaryEncoder(fo) 

798 write_data( 

799 encoder, 

800 record, 

801 schema, 

802 named_schemas, 

803 "", 

804 { 

805 "strict": strict, 

806 "strict_allow_default": strict_allow_default, 

807 "disable_tuple_notation": disable_tuple_notation, 

808 }, 

809 ) 

810 encoder.flush()