Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fastavro/_write_py.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

312 statements  

1"""Python code for writing AVRO files""" 

2 

3# This code is a modified version of the code at 

4# http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ which is under 

5# Apache 2.0 license (http://www.apache.org/licenses/LICENSE-2.0) 

6 

7from abc import ABC, abstractmethod 

8import json 

9from io import BytesIO 

10from os import urandom, SEEK_SET 

11import bz2 

12import lzma 

13import zlib 

14from typing import Union, IO, Iterable, Any, Optional, Dict 

15from warnings import warn 

16 

17from .const import NAMED_TYPES 

18from .io.binary_encoder import BinaryEncoder 

19from .io.json_encoder import AvroJSONEncoder 

20from .validation import _validate 

21from .read import HEADER_SCHEMA, SYNC_SIZE, MAGIC, reader 

22from .logical_writers import LOGICAL_WRITERS 

23from .schema import extract_record_type, extract_logical_type, parse_schema 

24from ._write_common import _is_appendable 

25from .types import Schema, NamedSchemas 

26 

27 

28def write_null(encoder, datum, schema, named_schemas, fname, options): 

29 """null is written as zero bytes""" 

30 encoder.write_null() 

31 

32 

33def write_boolean(encoder, datum, schema, named_schemas, fname, options): 

34 """A boolean is written as a single byte whose value is either 0 (false) or 

35 1 (true).""" 

36 encoder.write_boolean(datum) 

37 

38 

39def write_int(encoder, datum, schema, named_schemas, fname, options): 

40 """int and long values are written using variable-length, zig-zag coding.""" 

41 encoder.write_int(datum) 

42 

43 

44def write_long(encoder, datum, schema, named_schemas, fname, options): 

45 """int and long values are written using variable-length, zig-zag coding.""" 

46 encoder.write_long(datum) 

47 

48 

49def write_float(encoder, datum, schema, named_schemas, fname, options): 

50 """A float is written as 4 bytes. The float is converted into a 32-bit 

51 integer using a method equivalent to Java's floatToIntBits and then encoded 

52 in little-endian format.""" 

53 encoder.write_float(datum) 

54 

55 

56def write_double(encoder, datum, schema, named_schemas, fname, options): 

57 """A double is written as 8 bytes. The double is converted into a 64-bit 

58 integer using a method equivalent to Java's doubleToLongBits and then 

59 encoded in little-endian format.""" 

60 encoder.write_double(datum) 

61 

62 

63def write_bytes(encoder, datum, schema, named_schemas, fname, options): 

64 """Bytes are encoded as a long followed by that many bytes of data.""" 

65 encoder.write_bytes(datum) 

66 

67 

68def write_utf8(encoder, datum, schema, named_schemas, fname, options): 

69 """A string is encoded as a long followed by that many bytes of UTF-8 

70 encoded character data.""" 

71 encoder.write_utf8(datum) 

72 

73 

74def write_crc32(encoder, datum): 

75 """A 4-byte, big-endian CRC32 checksum""" 

76 encoder.write_crc32(datum) 

77 

78 

79def write_fixed(encoder, datum, schema, named_schemas, fname, options): 

80 """Fixed instances are encoded using the number of bytes declared in the 

81 schema.""" 

82 if len(datum) != schema["size"]: 

83 raise ValueError( 

84 f"data of length {len(datum)} does not match schema size: {schema}" 

85 ) 

86 encoder.write_fixed(datum) 

87 

88 

89def write_enum(encoder, datum, schema, named_schemas, fname, options): 

90 """An enum is encoded by a int, representing the zero-based position of 

91 the symbol in the schema.""" 

92 index = schema["symbols"].index(datum) 

93 encoder.write_enum(index) 

94 

95 

96def write_array(encoder, datum, schema, named_schemas, fname, options): 

97 """Arrays are encoded as a series of blocks. 

98 

99 Each block consists of a long count value, followed by that many array 

100 items. A block with count zero indicates the end of the array. Each item 

101 is encoded per the array's item schema. 

102 

103 If a block's count is negative, then the count is followed immediately by a 

104 long block size, indicating the number of bytes in the block. The actual 

105 count in this case is the absolute value of the count written.""" 

106 encoder.write_array_start() 

107 if len(datum) > 0: 

108 encoder.write_item_count(len(datum)) 

109 dtype = schema["items"] 

110 for item in datum: 

111 write_data(encoder, item, dtype, named_schemas, fname, options) 

112 encoder.end_item() 

113 encoder.write_array_end() 

114 

115 

116def write_map(encoder, datum, schema, named_schemas, fname, options): 

117 """Maps are encoded as a series of blocks. 

118 

119 Each block consists of a long count value, followed by that many key/value 

120 pairs. A block with count zero indicates the end of the map. Each item is 

121 encoded per the map's value schema. 

122 

123 If a block's count is negative, then the count is followed immediately by a 

124 long block size, indicating the number of bytes in the block. The actual 

125 count in this case is the absolute value of the count written.""" 

126 encoder.write_map_start() 

127 if len(datum) > 0: 

128 encoder.write_item_count(len(datum)) 

129 vtype = schema["values"] 

130 for key, val in datum.items(): 

131 encoder.write_utf8(key) 

132 write_data(encoder, val, vtype, named_schemas, fname, options) 

133 encoder.write_map_end() 

134 

135 

136def write_union(encoder, datum, schema, named_schemas, fname, options): 

137 """A union is encoded by first writing a long value indicating the 

138 zero-based position within the union of the schema of its value. The value 

139 is then encoded per the indicated schema within the union.""" 

140 

141 best_match_index = -1 

142 if isinstance(datum, tuple) and not options.get("disable_tuple_notation"): 

143 (name, datum) = datum 

144 for index, candidate in enumerate(schema): 

145 extracted_type = extract_record_type(candidate) 

146 if extracted_type in NAMED_TYPES: 

147 schema_name = candidate["name"] 

148 else: 

149 schema_name = extracted_type 

150 if name == schema_name: 

151 best_match_index = index 

152 break 

153 

154 if best_match_index == -1: 

155 field = f"on field {fname}" if fname else "" 

156 msg = ( 

157 f"provided union type name {name} not found in schema " 

158 + f"{schema} {field}" 

159 ) 

160 raise ValueError(msg) 

161 index = best_match_index 

162 else: 

163 pytype = type(datum) 

164 most_fields = -1 

165 

166 # All of Python's floating point values are doubles, so to 

167 # avoid loss of precision, we should always prefer 'double' 

168 # if we are forced to choose between float and double. 

169 # 

170 # If 'double' comes before 'float' in the union, then we'll immediately 

171 # choose it, and don't need to worry. But if 'float' comes before 

172 # 'double', we don't want to pick it. 

173 # 

174 # So, if we ever see 'float', we skim through the rest of the options, 

175 # just to see if 'double' is a possibility, because we'd prefer it. 

176 could_be_float = False 

177 

178 for index, candidate in enumerate(schema): 

179 if could_be_float: 

180 if extract_record_type(candidate) == "double": 

181 best_match_index = index 

182 break 

183 else: 

184 # Nothing except "double" is even worth considering. 

185 continue 

186 

187 if _validate( 

188 datum, 

189 candidate, 

190 named_schemas, 

191 raise_errors=False, 

192 field="", 

193 options=options, 

194 ): 

195 record_type = extract_record_type(candidate) 

196 if record_type in named_schemas: 

197 # Convert named record types into their full schema so that we can check most_fields 

198 candidate = named_schemas[record_type] 

199 record_type = extract_record_type(candidate) 

200 

201 if record_type == "record": 

202 logical_type = extract_logical_type(candidate) 

203 if logical_type: 

204 prepare = LOGICAL_WRITERS.get(logical_type) 

205 if prepare: 

206 datum = prepare(datum, candidate) 

207 

208 candidate_fields = set(f["name"] for f in candidate["fields"]) 

209 datum_fields = set(datum) 

210 fields = len(candidate_fields.intersection(datum_fields)) 

211 if fields > most_fields: 

212 best_match_index = index 

213 most_fields = fields 

214 elif record_type == "float": 

215 best_match_index = index 

216 # Continue in the loop, because it's possible that there's 

217 # another candidate which has record type 'double' 

218 could_be_float = True 

219 else: 

220 best_match_index = index 

221 break 

222 if best_match_index == -1: 

223 field = f"on field {fname}" if fname else "" 

224 raise ValueError( 

225 f"{repr(datum)} (type {pytype}) do not match {schema} {field}" 

226 ) 

227 index = best_match_index 

228 

229 # write data 

230 # TODO: There should be a way to give just the index 

231 encoder.write_index(index, schema[index]) 

232 write_data(encoder, datum, schema[index], named_schemas, fname, options) 

233 

234 

235def write_record(encoder, datum, schema, named_schemas, fname, options): 

236 """A record is encoded by encoding the values of its fields in the order 

237 that they are declared. In other words, a record is encoded as just the 

238 concatenation of the encodings of its fields. Field values are encoded per 

239 their schema.""" 

240 extras = set(datum) - set(field["name"] for field in schema["fields"]) 

241 if (options.get("strict") or options.get("strict_allow_default")) and extras: 

242 raise ValueError( 

243 f'record contains more fields than the schema specifies: {", ".join(extras)}' 

244 ) 

245 for field in schema["fields"]: 

246 name = field["name"] 

247 field_type = field["type"] 

248 if name not in datum: 

249 if options.get("strict") or ( 

250 options.get("strict_allow_default") and "default" not in field 

251 ): 

252 raise ValueError( 

253 f"Field {name} is specified in the schema but missing from the record" 

254 ) 

255 elif "default" not in field and "null" not in field_type: 

256 raise ValueError(f"no value and no default for {name}") 

257 datum_value = datum.get(name, field.get("default")) 

258 if field_type == "float" or field_type == "double": 

259 # Handle float values like "NaN" 

260 datum_value = float(datum_value) 

261 write_data( 

262 encoder, 

263 datum_value, 

264 field_type, 

265 named_schemas, 

266 name, 

267 options, 

268 ) 

269 

270 

271WRITERS = { 

272 "null": write_null, 

273 "boolean": write_boolean, 

274 "string": write_utf8, 

275 "int": write_int, 

276 "long": write_long, 

277 "float": write_float, 

278 "double": write_double, 

279 "bytes": write_bytes, 

280 "fixed": write_fixed, 

281 "enum": write_enum, 

282 "array": write_array, 

283 "map": write_map, 

284 "union": write_union, 

285 "error_union": write_union, 

286 "record": write_record, 

287 "error": write_record, 

288} 

289 

290 

291def write_data(encoder, datum, schema, named_schemas, fname, options): 

292 """Write a datum of data to output stream. 

293 

294 Parameters 

295 ---------- 

296 encoder: encoder 

297 Type of encoder (e.g. binary or json) 

298 datum: object 

299 Data to write 

300 schema: dict 

301 Schema to use 

302 named_schemas: dict 

303 Mapping of fullname to schema definition 

304 """ 

305 

306 record_type = extract_record_type(schema) 

307 logical_type = extract_logical_type(schema) 

308 

309 fn = WRITERS.get(record_type) 

310 if fn: 

311 if logical_type: 

312 prepare = LOGICAL_WRITERS.get(logical_type) 

313 if prepare: 

314 datum = prepare(datum, schema) 

315 try: 

316 return fn(encoder, datum, schema, named_schemas, fname, options) 

317 except TypeError as ex: 

318 if fname: 

319 raise TypeError(f"{ex} on field {fname}") 

320 raise 

321 else: 

322 return write_data( 

323 encoder, datum, named_schemas[record_type], named_schemas, "", options 

324 ) 

325 

326 

327def write_header(encoder, metadata, sync_marker): 

328 header = { 

329 "magic": MAGIC, 

330 "meta": {key: value.encode() for key, value in metadata.items()}, 

331 "sync": sync_marker, 

332 } 

333 write_data(encoder, header, HEADER_SCHEMA, {}, "", {}) 

334 

335 

336def null_write_block(encoder, block_bytes, compression_level): 

337 """Write block in "null" codec.""" 

338 encoder.write_long(len(block_bytes)) 

339 encoder._fo.write(block_bytes) 

340 

341 

342def deflate_write_block(encoder, block_bytes, compression_level): 

343 """Write block in "deflate" codec.""" 

344 # The first two characters and last character are zlib 

345 # wrappers around deflate data. 

346 if compression_level is not None: 

347 data = zlib.compress(block_bytes, compression_level)[2:-1] 

348 else: 

349 data = zlib.compress(block_bytes)[2:-1] 

350 encoder.write_long(len(data)) 

351 encoder._fo.write(data) 

352 

353 

354def bzip2_write_block(encoder, block_bytes, compression_level): 

355 """Write block in "bzip2" codec.""" 

356 data = bz2.compress(block_bytes) 

357 encoder.write_long(len(data)) 

358 encoder._fo.write(data) 

359 

360 

361def xz_write_block(encoder, block_bytes, compression_level): 

362 """Write block in "xz" codec.""" 

363 data = lzma.compress(block_bytes) 

364 encoder.write_long(len(data)) 

365 encoder._fo.write(data) 

366 

367 

368BLOCK_WRITERS = { 

369 "null": null_write_block, 

370 "deflate": deflate_write_block, 

371 "bzip2": bzip2_write_block, 

372 "xz": xz_write_block, 

373} 

374 

375 

376def _missing_codec_lib(codec, *libraries): 

377 def missing(encoder, block_bytes, compression_level): 

378 raise ValueError( 

379 f"{codec} codec is supported but you need to install one of the " 

380 + f"following libraries: {libraries}" 

381 ) 

382 

383 return missing 

384 

385 

386def snappy_write_block(encoder, block_bytes, compression_level): 

387 """Write block in "snappy" codec.""" 

388 data = snappy_compress(block_bytes) 

389 encoder.write_long(len(data) + 4) # for CRC 

390 encoder._fo.write(data) 

391 encoder.write_crc32(block_bytes) 

392 

393 

394try: 

395 from cramjam import snappy 

396 

397 snappy_compress = snappy.compress_raw 

398except ImportError: 

399 try: 

400 import snappy 

401 

402 snappy_compress = snappy.compress 

403 warn( 

404 "Snappy compression will use `cramjam` in the future. Please make sure you have `cramjam` installed", 

405 DeprecationWarning, 

406 ) 

407 except ImportError: 

408 BLOCK_WRITERS["snappy"] = _missing_codec_lib("snappy", "cramjam") 

409 else: 

410 BLOCK_WRITERS["snappy"] = snappy_write_block 

411else: 

412 BLOCK_WRITERS["snappy"] = snappy_write_block 

413 

414 

415def zstandard_write_block(encoder, block_bytes, compression_level): 

416 """Write block in "zstandard" codec.""" 

417 if compression_level is not None: 

418 data = zstandard.ZstdCompressor(level=compression_level).compress(block_bytes) 

419 else: 

420 data = zstandard.ZstdCompressor().compress(block_bytes) 

421 encoder.write_long(len(data)) 

422 encoder._fo.write(data) 

423 

424 

425try: 

426 import zstandard 

427except ImportError: 

428 BLOCK_WRITERS["zstandard"] = _missing_codec_lib("zstandard", "zstandard") 

429else: 

430 BLOCK_WRITERS["zstandard"] = zstandard_write_block 

431 

432 

433def lz4_write_block(encoder, block_bytes, compression_level): 

434 """Write block in "lz4" codec.""" 

435 data = lz4.block.compress(block_bytes) 

436 encoder.write_long(len(data)) 

437 encoder._fo.write(data) 

438 

439 

440try: 

441 import lz4.block 

442except ImportError: 

443 BLOCK_WRITERS["lz4"] = _missing_codec_lib("lz4", "lz4") 

444else: 

445 BLOCK_WRITERS["lz4"] = lz4_write_block 

446 

447 

448class GenericWriter(ABC): 

449 def __init__(self, schema, metadata=None, validator=None, options={}): 

450 self._named_schemas = {} 

451 self.validate_fn = _validate if validator else None 

452 self.metadata = metadata or {} 

453 self.options = options 

454 

455 # A schema of None is allowed when appending and when doing so the 

456 # self.schema will be updated later 

457 if schema is not None: 

458 self.schema = parse_schema(schema, self._named_schemas) 

459 

460 if isinstance(schema, dict): 

461 schema = { 

462 key: value 

463 for key, value in schema.items() 

464 if key not in ("__fastavro_parsed", "__named_schemas") 

465 } 

466 elif isinstance(schema, list): 

467 schemas = [] 

468 for s in schema: 

469 if isinstance(s, dict): 

470 schemas.append( 

471 { 

472 key: value 

473 for key, value in s.items() 

474 if key 

475 not in ( 

476 "__fastavro_parsed", 

477 "__named_schemas", 

478 ) 

479 } 

480 ) 

481 else: 

482 schemas.append(s) 

483 schema = schemas 

484 

485 self.metadata["avro.schema"] = json.dumps(schema) 

486 

487 @abstractmethod 

488 def write(self, record): 

489 pass 

490 

491 @abstractmethod 

492 def flush(self): 

493 pass 

494 

495 

496class Writer(GenericWriter): 

497 def __init__( 

498 self, 

499 fo: Union[IO, BinaryEncoder], 

500 schema: Schema, 

501 codec: str = "null", 

502 sync_interval: int = 1000 * SYNC_SIZE, 

503 metadata: Optional[Dict[str, str]] = None, 

504 validator: bool = False, 

505 sync_marker: bytes = b"", 

506 compression_level: Optional[int] = None, 

507 options: Dict[str, bool] = {}, 

508 ): 

509 super().__init__(schema, metadata, validator, options) 

510 

511 self.metadata["avro.codec"] = codec 

512 if isinstance(fo, BinaryEncoder): 

513 self.encoder = fo 

514 else: 

515 self.encoder = BinaryEncoder(fo) 

516 self.io = BinaryEncoder(BytesIO()) 

517 self.block_count = 0 

518 self.sync_interval = sync_interval 

519 self.compression_level = compression_level 

520 

521 if _is_appendable(self.encoder._fo): 

522 # Seed to the beginning to read the header 

523 self.encoder._fo.seek(0) 

524 avro_reader = reader(self.encoder._fo) 

525 header = avro_reader._header 

526 

527 self._named_schemas = {} 

528 self.schema = parse_schema(avro_reader.writer_schema, self._named_schemas) 

529 

530 codec = avro_reader.metadata.get("avro.codec", "null") 

531 

532 self.sync_marker = header["sync"] 

533 

534 # Seek to the end of the file 

535 self.encoder._fo.seek(0, 2) 

536 

537 self.block_writer = BLOCK_WRITERS[codec] 

538 else: 

539 self.sync_marker = sync_marker or urandom(SYNC_SIZE) 

540 

541 try: 

542 self.block_writer = BLOCK_WRITERS[codec] 

543 except KeyError: 

544 raise ValueError(f"unrecognized codec: {codec}") 

545 

546 write_header(self.encoder, self.metadata, self.sync_marker) 

547 

548 def dump(self): 

549 self.encoder.write_long(self.block_count) 

550 self.block_writer(self.encoder, self.io._fo.getvalue(), self.compression_level) 

551 self.encoder._fo.write(self.sync_marker) 

552 self.io._fo.truncate(0) 

553 self.io._fo.seek(0, SEEK_SET) 

554 self.block_count = 0 

555 

556 def write(self, record): 

557 if self.validate_fn: 

558 self.validate_fn( 

559 record, self.schema, self._named_schemas, "", True, self.options 

560 ) 

561 write_data(self.io, record, self.schema, self._named_schemas, "", self.options) 

562 self.block_count += 1 

563 if self.io._fo.tell() >= self.sync_interval: 

564 self.dump() 

565 

566 def write_block(self, block): 

567 # Clear existing block if there are any records pending 

568 if self.io._fo.tell() or self.block_count > 0: 

569 self.dump() 

570 self.encoder.write_long(block.num_records) 

571 self.block_writer(self.encoder, block.bytes_.getvalue(), self.compression_level) 

572 self.encoder._fo.write(self.sync_marker) 

573 

574 def flush(self): 

575 if self.io._fo.tell() or self.block_count > 0: 

576 self.dump() 

577 self.encoder._fo.flush() 

578 

579 

580class JSONWriter(GenericWriter): 

581 def __init__( 

582 self, 

583 fo: AvroJSONEncoder, 

584 schema: Schema, 

585 codec: str = "null", 

586 sync_interval: int = 1000 * SYNC_SIZE, 

587 metadata: Optional[Dict[str, str]] = None, 

588 validator: bool = False, 

589 sync_marker: bytes = b"", 

590 codec_compression_level: Optional[int] = None, 

591 options: Dict[str, bool] = {}, 

592 ): 

593 super().__init__(schema, metadata, validator, options) 

594 

595 self.encoder = fo 

596 self.encoder.configure(self.schema, self._named_schemas) 

597 

598 def write(self, record): 

599 if self.validate_fn: 

600 self.validate_fn( 

601 record, self.schema, self._named_schemas, "", True, self.options 

602 ) 

603 write_data( 

604 self.encoder, record, self.schema, self._named_schemas, "", self.options 

605 ) 

606 

607 def flush(self): 

608 self.encoder.flush() 

609 

610 

611def writer( 

612 fo: Union[IO, AvroJSONEncoder], 

613 schema: Schema, 

614 records: Iterable[Any], 

615 codec: str = "null", 

616 sync_interval: int = 1000 * SYNC_SIZE, 

617 metadata: Optional[Dict[str, str]] = None, 

618 validator: bool = False, 

619 sync_marker: bytes = b"", 

620 codec_compression_level: Optional[int] = None, 

621 *, 

622 strict: bool = False, 

623 strict_allow_default: bool = False, 

624 disable_tuple_notation: bool = False, 

625): 

626 """Write records to fo (stream) according to schema 

627 

628 Parameters 

629 ---------- 

630 fo 

631 Output stream 

632 schema 

633 Writer schema 

634 records 

635 Records to write. This is commonly a list of the dictionary 

636 representation of the records, but it can be any iterable 

637 codec 

638 Compression codec, can be 'null', 'deflate' or 'snappy' (if installed) 

639 sync_interval 

640 Size of sync interval 

641 metadata 

642 Header metadata 

643 validator 

644 If true, validation will be done on the records 

645 sync_marker 

646 A byte string used as the avro sync marker. If not provided, a random 

647 byte string will be used. 

648 codec_compression_level 

649 Compression level to use with the specified codec (if the codec 

650 supports it) 

651 strict 

652 If set to True, an error will be raised if records do not contain 

653 exactly the same fields that the schema states 

654 strict_allow_default 

655 If set to True, an error will be raised if records do not contain 

656 exactly the same fields that the schema states unless it is a missing 

657 field that has a default value in the schema 

658 disable_tuple_notation 

659 If set to True, tuples will not be treated as a special case. Therefore, 

660 using a tuple to indicate the type of a record will not work 

661 

662 

663 Example:: 

664 

665 from fastavro import writer, parse_schema 

666 

667 schema = { 

668 'doc': 'A weather reading.', 

669 'name': 'Weather', 

670 'namespace': 'test', 

671 'type': 'record', 

672 'fields': [ 

673 {'name': 'station', 'type': 'string'}, 

674 {'name': 'time', 'type': 'long'}, 

675 {'name': 'temp', 'type': 'int'}, 

676 ], 

677 } 

678 parsed_schema = parse_schema(schema) 

679 

680 records = [ 

681 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388}, 

682 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389}, 

683 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379}, 

684 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478}, 

685 ] 

686 

687 with open('weather.avro', 'wb') as out: 

688 writer(out, parsed_schema, records) 

689 

690 The `fo` argument is a file-like object so another common example usage 

691 would use an `io.BytesIO` object like so:: 

692 

693 from io import BytesIO 

694 from fastavro import writer 

695 

696 fo = BytesIO() 

697 writer(fo, schema, records) 

698 

699 Given an existing avro file, it's possible to append to it by re-opening 

700 the file in `a+b` mode. If the file is only opened in `ab` mode, we aren't 

701 able to read some of the existing header information and an error will be 

702 raised. For example:: 

703 

704 # Write initial records 

705 with open('weather.avro', 'wb') as out: 

706 writer(out, parsed_schema, records) 

707 

708 # Write some more records 

709 with open('weather.avro', 'a+b') as out: 

710 writer(out, None, more_records) 

711 

712 Note: When appending, any schema provided will be ignored since the schema 

713 in the avro file will be re-used. Therefore it is convenient to just use 

714 None as the schema. 

715 """ 

716 # Sanity check that records is not a single dictionary (as that is a common 

717 # mistake and the exception that gets raised is not helpful) 

718 if isinstance(records, dict): 

719 raise ValueError('"records" argument should be an iterable, not dict') 

720 

721 output: Union[JSONWriter, Writer] 

722 if isinstance(fo, AvroJSONEncoder): 

723 output = JSONWriter( 

724 fo, 

725 schema, 

726 codec, 

727 sync_interval, 

728 metadata, 

729 validator, 

730 sync_marker, 

731 codec_compression_level, 

732 options={ 

733 "strict": strict, 

734 "strict_allow_default": strict_allow_default, 

735 "disable_tuple_notation": disable_tuple_notation, 

736 }, 

737 ) 

738 else: 

739 output = Writer( 

740 BinaryEncoder(fo), 

741 schema, 

742 codec, 

743 sync_interval, 

744 metadata, 

745 validator, 

746 sync_marker, 

747 codec_compression_level, 

748 options={ 

749 "strict": strict, 

750 "strict_allow_default": strict_allow_default, 

751 "disable_tuple_notation": disable_tuple_notation, 

752 }, 

753 ) 

754 

755 for record in records: 

756 output.write(record) 

757 output.flush() 

758 

759 

760def schemaless_writer( 

761 fo: IO, 

762 schema: Schema, 

763 record: Any, 

764 *, 

765 strict: bool = False, 

766 strict_allow_default: bool = False, 

767 disable_tuple_notation: bool = False, 

768): 

769 """Write a single record without the schema or header information 

770 

771 Parameters 

772 ---------- 

773 fo 

774 Output file 

775 schema 

776 Schema 

777 record 

778 Record to write 

779 strict 

780 If set to True, an error will be raised if records do not contain 

781 exactly the same fields that the schema states 

782 strict_allow_default 

783 If set to True, an error will be raised if records do not contain 

784 exactly the same fields that the schema states unless it is a missing 

785 field that has a default value in the schema 

786 disable_tuple_notation 

787 If set to True, tuples will not be treated as a special case. Therefore, 

788 using a tuple to indicate the type of a record will not work 

789 

790 

791 Example:: 

792 

793 parsed_schema = fastavro.parse_schema(schema) 

794 with open('file', 'wb') as fp: 

795 fastavro.schemaless_writer(fp, parsed_schema, record) 

796 

797 Note: The ``schemaless_writer`` can only write a single record. 

798 """ 

799 named_schemas: NamedSchemas = {} 

800 schema = parse_schema(schema, named_schemas) 

801 

802 encoder = BinaryEncoder(fo) 

803 write_data( 

804 encoder, 

805 record, 

806 schema, 

807 named_schemas, 

808 "", 

809 { 

810 "strict": strict, 

811 "strict_allow_default": strict_allow_default, 

812 "disable_tuple_notation": disable_tuple_notation, 

813 }, 

814 ) 

815 encoder.flush()