Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorboard/compat/tensorflow_stub/io/gfile.py: 23%

475 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""A limited reimplementation of the TensorFlow FileIO API. 

16 

17The TensorFlow version wraps the C++ FileSystem API. Here we provide a 

18pure Python implementation, limited to the features required for 

19TensorBoard. This allows running TensorBoard without depending on 

20TensorFlow for file operations. 

21""" 

22 

23import dataclasses 

24import glob as py_glob 

25import io 

26import os 

27import os.path 

28import sys 

29import tempfile 

30 

31try: 

32 import botocore.exceptions 

33 import boto3 

34 

35 S3_ENABLED = True 

36except ImportError: 

37 S3_ENABLED = False 

38 

39try: 

40 import fsspec 

41 

42 FSSPEC_ENABLED = True 

43except ImportError: 

44 FSSPEC_ENABLED = False 

45 

46if sys.version_info < (3, 0): 

47 # In Python 2 FileExistsError is not defined and the 

48 # error manifests it as OSError. 

49 FileExistsError = OSError 

50 

51from tensorboard.compat.tensorflow_stub import compat, errors 

52 

53 

54# A good default block size depends on the system in question. 

55# A somewhat conservative default chosen here. 

56_DEFAULT_BLOCK_SIZE = 16 * 1024 * 1024 

57 

58 

59# Registry of filesystems by prefix. 

60# 

61# Currently supports "s3://" URLs for S3 based on boto3 and falls 

62# back to local filesystem. 

63_REGISTERED_FILESYSTEMS = {} 

64 

65 

66def register_filesystem(prefix, filesystem): 

67 if ":" in prefix: 

68 raise ValueError("Filesystem prefix cannot contain a :") 

69 _REGISTERED_FILESYSTEMS[prefix] = filesystem 

70 

71 

72def get_filesystem(filename): 

73 """Return the registered filesystem for the given file.""" 

74 filename = compat.as_str_any(filename) 

75 prefix = "" 

76 index = filename.find("://") 

77 if index >= 0: 

78 prefix = filename[:index] 

79 fs = _REGISTERED_FILESYSTEMS.get(prefix, None) 

80 if fs is None: 

81 fs = _get_fsspec_filesystem(filename) 

82 if fs is None: 

83 raise ValueError("No recognized filesystem for prefix %s" % prefix) 

84 return fs 

85 

86 

87@dataclasses.dataclass(frozen=True) 

88class StatData: 

89 """Data returned from the Stat call. 

90 

91 Attributes: 

92 length: Length of the data content. 

93 """ 

94 

95 length: int 

96 

97 

98class LocalFileSystem: 

99 """Provides local fileystem access.""" 

100 

101 def exists(self, filename): 

102 """Determines whether a path exists or not.""" 

103 return os.path.exists(compat.as_bytes(filename)) 

104 

105 def join(self, path, *paths): 

106 """Join paths with path delimiter.""" 

107 return os.path.join(path, *paths) 

108 

109 def read(self, filename, binary_mode=False, size=None, continue_from=None): 

110 """Reads contents of a file to a string. 

111 

112 Args: 

113 filename: string, a path 

114 binary_mode: bool, read as binary if True, otherwise text 

115 size: int, number of bytes or characters to read, otherwise 

116 read all the contents of the file (from the continuation 

117 marker, if present). 

118 continue_from: An opaque value returned from a prior invocation of 

119 `read(...)` marking the last read position, so that reading 

120 may continue from there. Otherwise read from the beginning. 

121 

122 Returns: 

123 A tuple of `(data, continuation_token)` where `data' provides either 

124 bytes read from the file (if `binary_mode == true`) or the decoded 

125 string representation thereof (otherwise), and `continuation_token` 

126 is an opaque value that can be passed to the next invocation of 

127 `read(...) ' in order to continue from the last read position. 

128 """ 

129 mode = "rb" if binary_mode else "r" 

130 encoding = None if binary_mode else "utf8" 

131 if not exists(filename): 

132 raise errors.NotFoundError( 

133 None, None, "Not Found: " + compat.as_text(filename) 

134 ) 

135 offset = None 

136 if continue_from is not None: 

137 offset = continue_from.get("opaque_offset", None) 

138 with io.open(filename, mode, encoding=encoding) as f: 

139 if offset is not None: 

140 f.seek(offset) 

141 data = f.read(size) 

142 # The new offset may not be `offset + len(data)`, due to decoding 

143 # and newline translation. 

144 # So, just measure it in whatever terms the underlying stream uses. 

145 continuation_token = {"opaque_offset": f.tell()} 

146 return (data, continuation_token) 

147 

148 def write(self, filename, file_content, binary_mode=False): 

149 """Writes string file contents to a file, overwriting any existing 

150 contents. 

151 

152 Args: 

153 filename: string, a path 

154 file_content: string, the contents 

155 binary_mode: bool, write as binary if True, otherwise text 

156 """ 

157 self._write(filename, file_content, "wb" if binary_mode else "w") 

158 

159 def append(self, filename, file_content, binary_mode=False): 

160 """Append string file contents to a file. 

161 

162 Args: 

163 filename: string, a path 

164 file_content: string, the contents to append 

165 binary_mode: bool, write as binary if True, otherwise text 

166 """ 

167 self._write(filename, file_content, "ab" if binary_mode else "a") 

168 

169 def _write(self, filename, file_content, mode): 

170 encoding = None if "b" in mode else "utf8" 

171 with io.open(filename, mode, encoding=encoding) as f: 

172 compatify = compat.as_bytes if "b" in mode else compat.as_text 

173 f.write(compatify(file_content)) 

174 

175 def glob(self, filename): 

176 """Returns a list of files that match the given pattern(s).""" 

177 if isinstance(filename, str): 

178 return [ 

179 # Convert the filenames to string from bytes. 

180 compat.as_str_any(matching_filename) 

181 for matching_filename in py_glob.glob(compat.as_bytes(filename)) 

182 ] 

183 else: 

184 return [ 

185 # Convert the filenames to string from bytes. 

186 compat.as_str_any(matching_filename) 

187 for single_filename in filename 

188 for matching_filename in py_glob.glob( 

189 compat.as_bytes(single_filename) 

190 ) 

191 ] 

192 

193 def isdir(self, dirname): 

194 """Returns whether the path is a directory or not.""" 

195 return os.path.isdir(compat.as_bytes(dirname)) 

196 

197 def listdir(self, dirname): 

198 """Returns a list of entries contained within a directory.""" 

199 if not self.isdir(dirname): 

200 raise errors.NotFoundError(None, None, "Could not find directory") 

201 

202 entries = os.listdir(compat.as_str_any(dirname)) 

203 entries = [compat.as_str_any(item) for item in entries] 

204 return entries 

205 

206 def makedirs(self, path): 

207 """Creates a directory and all parent/intermediate directories.""" 

208 os.makedirs(path, exist_ok=True) 

209 

210 def stat(self, filename): 

211 """Returns file statistics for a given path.""" 

212 # NOTE: Size of the file is given by .st_size as returned from 

213 # os.stat(), but we convert to .length 

214 try: 

215 file_length = os.stat(compat.as_bytes(filename)).st_size 

216 except OSError: 

217 raise errors.NotFoundError(None, None, "Could not find file") 

218 return StatData(file_length) 

219 

220 

221class S3FileSystem: 

222 """Provides filesystem access to S3.""" 

223 

224 def __init__(self): 

225 if not boto3: 

226 raise ImportError("boto3 must be installed for S3 support.") 

227 self._s3_endpoint = os.environ.get("S3_ENDPOINT", None) 

228 

229 def bucket_and_path(self, url): 

230 """Split an S3-prefixed URL into bucket and path.""" 

231 url = compat.as_str_any(url) 

232 if url.startswith("s3://"): 

233 url = url[len("s3://") :] 

234 idx = url.index("/") 

235 bucket = url[:idx] 

236 path = url[(idx + 1) :] 

237 return bucket, path 

238 

239 def exists(self, filename): 

240 """Determines whether a path exists or not.""" 

241 client = boto3.client("s3", endpoint_url=self._s3_endpoint) 

242 bucket, path = self.bucket_and_path(filename) 

243 r = client.list_objects(Bucket=bucket, Prefix=path, Delimiter="/") 

244 if r.get("Contents") or r.get("CommonPrefixes"): 

245 return True 

246 return False 

247 

248 def join(self, path, *paths): 

249 """Join paths with a slash.""" 

250 return "/".join((path,) + paths) 

251 

252 def read(self, filename, binary_mode=False, size=None, continue_from=None): 

253 """Reads contents of a file to a string. 

254 

255 Args: 

256 filename: string, a path 

257 binary_mode: bool, read as binary if True, otherwise text 

258 size: int, number of bytes or characters to read, otherwise 

259 read all the contents of the file (from the continuation 

260 marker, if present). 

261 continue_from: An opaque value returned from a prior invocation of 

262 `read(...)` marking the last read position, so that reading 

263 may continue from there. Otherwise read from the beginning. 

264 

265 Returns: 

266 A tuple of `(data, continuation_token)` where `data' provides either 

267 bytes read from the file (if `binary_mode == true`) or the decoded 

268 string representation thereof (otherwise), and `continuation_token` 

269 is an opaque value that can be passed to the next invocation of 

270 `read(...) ' in order to continue from the last read position. 

271 """ 

272 s3 = boto3.resource("s3", endpoint_url=self._s3_endpoint) 

273 bucket, path = self.bucket_and_path(filename) 

274 args = {} 

275 

276 # For the S3 case, we use continuation tokens of the form 

277 # {byte_offset: number} 

278 offset = 0 

279 if continue_from is not None: 

280 offset = continue_from.get("byte_offset", 0) 

281 

282 endpoint = "" 

283 if size is not None: 

284 # TODO(orionr): This endpoint risks splitting a multi-byte 

285 # character or splitting \r and \n in the case of CRLFs, 

286 # producing decoding errors below. 

287 endpoint = offset + size 

288 

289 if offset != 0 or endpoint != "": 

290 # Asked for a range, so modify the request 

291 args["Range"] = "bytes={}-{}".format(offset, endpoint) 

292 

293 try: 

294 stream = s3.Object(bucket, path).get(**args)["Body"].read() 

295 except botocore.exceptions.ClientError as exc: 

296 if exc.response["Error"]["Code"] in ["416", "InvalidRange"]: 

297 if size is not None: 

298 # Asked for too much, so request just to the end. Do this 

299 # in a second request so we don't check length in all cases. 

300 client = boto3.client("s3", endpoint_url=self._s3_endpoint) 

301 obj = client.head_object(Bucket=bucket, Key=path) 

302 content_length = obj["ContentLength"] 

303 endpoint = min(content_length, offset + size) 

304 if offset == endpoint: 

305 # Asked for no bytes, so just return empty 

306 stream = b"" 

307 else: 

308 args["Range"] = "bytes={}-{}".format(offset, endpoint) 

309 stream = s3.Object(bucket, path).get(**args)["Body"].read() 

310 else: 

311 raise 

312 # `stream` should contain raw bytes here (i.e., there has been neither 

313 # decoding nor newline translation), so the byte offset increases by 

314 # the expected amount. 

315 continuation_token = {"byte_offset": (offset + len(stream))} 

316 if binary_mode: 

317 return (bytes(stream), continuation_token) 

318 else: 

319 return (stream.decode("utf-8"), continuation_token) 

320 

321 def write(self, filename, file_content, binary_mode=False): 

322 """Writes string file contents to a file. 

323 

324 Args: 

325 filename: string, a path 

326 file_content: string, the contents 

327 binary_mode: bool, write as binary if True, otherwise text 

328 """ 

329 client = boto3.client("s3", endpoint_url=self._s3_endpoint) 

330 bucket, path = self.bucket_and_path(filename) 

331 # Always convert to bytes for writing 

332 if binary_mode: 

333 if not isinstance(file_content, bytes): 

334 raise TypeError("File content type must be bytes") 

335 else: 

336 file_content = compat.as_bytes(file_content) 

337 client.put_object(Body=file_content, Bucket=bucket, Key=path) 

338 

339 def glob(self, filename): 

340 """Returns a list of files that match the given pattern(s).""" 

341 # Only support prefix with * at the end and no ? in the string 

342 star_i = filename.find("*") 

343 quest_i = filename.find("?") 

344 if quest_i >= 0: 

345 raise NotImplementedError( 

346 "{} not supported by compat glob".format(filename) 

347 ) 

348 if star_i != len(filename) - 1: 

349 # Just return empty so we can use glob from directory watcher 

350 # 

351 # TODO: Remove and instead handle in GetLogdirSubdirectories. 

352 # However, we would need to handle it for all non-local registered 

353 # filesystems in some way. 

354 return [] 

355 filename = filename[:-1] 

356 client = boto3.client("s3", endpoint_url=self._s3_endpoint) 

357 bucket, path = self.bucket_and_path(filename) 

358 p = client.get_paginator("list_objects") 

359 keys = [] 

360 for r in p.paginate(Bucket=bucket, Prefix=path): 

361 for o in r.get("Contents", []): 

362 key = o["Key"][len(path) :] 

363 if key: # Skip the base dir, which would add an empty string 

364 keys.append(filename + key) 

365 return keys 

366 

367 def isdir(self, dirname): 

368 """Returns whether the path is a directory or not.""" 

369 client = boto3.client("s3", endpoint_url=self._s3_endpoint) 

370 bucket, path = self.bucket_and_path(dirname) 

371 if not path.endswith("/"): 

372 path += "/" # This will now only retrieve subdir content 

373 r = client.list_objects(Bucket=bucket, Prefix=path, Delimiter="/") 

374 if r.get("Contents") or r.get("CommonPrefixes"): 

375 return True 

376 return False 

377 

378 def listdir(self, dirname): 

379 """Returns a list of entries contained within a directory.""" 

380 client = boto3.client("s3", endpoint_url=self._s3_endpoint) 

381 bucket, path = self.bucket_and_path(dirname) 

382 p = client.get_paginator("list_objects") 

383 if not path.endswith("/"): 

384 path += "/" # This will now only retrieve subdir content 

385 keys = [] 

386 for r in p.paginate(Bucket=bucket, Prefix=path, Delimiter="/"): 

387 keys.extend( 

388 o["Prefix"][len(path) : -1] for o in r.get("CommonPrefixes", []) 

389 ) 

390 for o in r.get("Contents", []): 

391 key = o["Key"][len(path) :] 

392 if key: # Skip the base dir, which would add an empty string 

393 keys.append(key) 

394 return keys 

395 

396 def makedirs(self, dirname): 

397 """Creates a directory and all parent/intermediate directories.""" 

398 if not self.exists(dirname): 

399 client = boto3.client("s3", endpoint_url=self._s3_endpoint) 

400 bucket, path = self.bucket_and_path(dirname) 

401 if not path.endswith("/"): 

402 path += "/" # This will make sure we don't override a file 

403 client.put_object(Body="", Bucket=bucket, Key=path) 

404 

405 def stat(self, filename): 

406 """Returns file statistics for a given path.""" 

407 # NOTE: Size of the file is given by ContentLength from S3, 

408 # but we convert to .length 

409 client = boto3.client("s3", endpoint_url=self._s3_endpoint) 

410 bucket, path = self.bucket_and_path(filename) 

411 try: 

412 obj = client.head_object(Bucket=bucket, Key=path) 

413 return StatData(obj["ContentLength"]) 

414 except botocore.exceptions.ClientError as exc: 

415 if exc.response["Error"]["Code"] == "404": 

416 raise errors.NotFoundError(None, None, "Could not find file") 

417 else: 

418 raise 

419 

420 

421class FSSpecFileSystem: 

422 """Provides filesystem access via fsspec. 

423 

424 The current gfile interface doesn't map perfectly to the fsspec interface 

425 leading to some notable inefficiencies. 

426 

427 * Reads and writes to files cause the file to be reopened each time which 

428 can cause a performance hit when accessing local file systems. 

429 * walk doesn't use the native fsspec walk function so performance may be 

430 slower. 

431 

432 See https://github.com/tensorflow/tensorboard/issues/5286 for more info on 

433 limitations. 

434 """ 

435 

436 SEPARATOR = "://" 

437 CHAIN_SEPARATOR = "::" 

438 

439 def _validate_path(self, path): 

440 parts = path.split(self.CHAIN_SEPARATOR) 

441 for part in parts[:-1]: 

442 if self.SEPARATOR in part: 

443 raise errors.InvalidArgumentError( 

444 None, 

445 None, 

446 "fsspec URL must only have paths in the last chained filesystem, got {}".format( 

447 path 

448 ), 

449 ) 

450 

451 def _translate_errors(func): 

452 def func_wrapper(self, *args, **kwargs): 

453 try: 

454 return func(self, *args, **kwargs) 

455 except FileNotFoundError as e: 

456 raise errors.NotFoundError(None, None, str(e)) 

457 

458 return func_wrapper 

459 

460 def _fs_path(self, filename): 

461 if isinstance(filename, bytes): 

462 filename = filename.decode("utf-8") 

463 self._validate_path(filename) 

464 

465 fs, path = fsspec.core.url_to_fs(filename) 

466 return fs, path 

467 

468 @_translate_errors 

469 def exists(self, filename): 

470 """Determines whether a path exists or not.""" 

471 fs, path = self._fs_path(filename) 

472 return fs.exists(path) 

473 

474 def _join(self, sep, paths): 

475 """ 

476 _join joins the paths with the given separator. 

477 """ 

478 result = [] 

479 for part in paths: 

480 if part.startswith(sep): 

481 result = [] 

482 if result and result[-1] and not result[-1].endswith(sep): 

483 result.append(sep) 

484 result.append(part) 

485 return "".join(result) 

486 

487 @_translate_errors 

488 def join(self, path, *paths): 

489 """Join paths with a slash.""" 

490 self._validate_path(path) 

491 

492 before, sep, last_path = path.rpartition(self.CHAIN_SEPARATOR) 

493 chain_prefix = before + sep 

494 protocol, path = fsspec.core.split_protocol(last_path) 

495 fs = fsspec.get_filesystem_class(protocol) 

496 if protocol: 

497 chain_prefix += protocol + self.SEPARATOR 

498 return chain_prefix + self._join(fs.sep, ((path,) + paths)) 

499 

500 @_translate_errors 

501 def read(self, filename, binary_mode=False, size=None, continue_from=None): 

502 """Reads contents of a file to a string. 

503 

504 Args: 

505 filename: string, a path 

506 binary_mode: bool, read as binary if True, otherwise text 

507 size: int, number of bytes or characters to read, otherwise 

508 read all the contents of the file (from the continuation 

509 marker, if present). 

510 continue_from: An opaque value returned from a prior invocation of 

511 `read(...)` marking the last read position, so that reading 

512 may continue from there. Otherwise read from the beginning. 

513 

514 Returns: 

515 A tuple of `(data, continuation_token)` where `data' provides either 

516 bytes read from the file (if `binary_mode == true`) or the decoded 

517 string representation thereof (otherwise), and `continuation_token` 

518 is an opaque value that can be passed to the next invocation of 

519 `read(...) ' in order to continue from the last read position. 

520 """ 

521 fs, path = self._fs_path(filename) 

522 

523 mode = "rb" if binary_mode else "r" 

524 encoding = None if binary_mode else "utf8" 

525 if not exists(filename): 

526 raise errors.NotFoundError( 

527 None, None, "Not Found: " + compat.as_text(filename) 

528 ) 

529 with fs.open(path, mode, encoding=encoding) as f: 

530 if continue_from is not None: 

531 if not f.seekable(): 

532 raise errors.InvalidArgumentError( 

533 None, 

534 None, 

535 "{} is not seekable".format(filename), 

536 ) 

537 offset = continue_from.get("opaque_offset", None) 

538 if offset is not None: 

539 f.seek(offset) 

540 

541 data = f.read(size) 

542 # The new offset may not be `offset + len(data)`, due to decoding 

543 # and newline translation. 

544 # So, just measure it in whatever terms the underlying stream uses. 

545 continuation_token = ( 

546 {"opaque_offset": f.tell()} if f.seekable() else {} 

547 ) 

548 return (data, continuation_token) 

549 

550 @_translate_errors 

551 def write(self, filename, file_content, binary_mode=False): 

552 """Writes string file contents to a file. 

553 

554 Args: 

555 filename: string, a path 

556 file_content: string, the contents 

557 binary_mode: bool, write as binary if True, otherwise text 

558 """ 

559 self._write(filename, file_content, "wb" if binary_mode else "w") 

560 

561 @_translate_errors 

562 def append(self, filename, file_content, binary_mode=False): 

563 """Append string file contents to a file. 

564 

565 Args: 

566 filename: string, a path 

567 file_content: string, the contents to append 

568 binary_mode: bool, write as binary if True, otherwise text 

569 """ 

570 self._write(filename, file_content, "ab" if binary_mode else "a") 

571 

572 def _write(self, filename, file_content, mode): 

573 fs, path = self._fs_path(filename) 

574 encoding = None if "b" in mode else "utf8" 

575 with fs.open(path, mode, encoding=encoding) as f: 

576 compatify = compat.as_bytes if "b" in mode else compat.as_text 

577 f.write(compatify(file_content)) 

578 

579 def _get_chain_protocol_prefix(self, filename): 

580 chain_prefix, chain_sep, last_path = filename.rpartition( 

581 self.CHAIN_SEPARATOR 

582 ) 

583 protocol, sep, _ = last_path.rpartition(self.SEPARATOR) 

584 return chain_prefix + chain_sep + protocol + sep 

585 

586 @_translate_errors 

587 def glob(self, filename): 

588 """Returns a list of files that match the given pattern(s).""" 

589 if isinstance(filename, bytes): 

590 filename = filename.decode("utf-8") 

591 

592 fs, path = self._fs_path(filename) 

593 files = fs.glob(path) 

594 

595 # check if applying the original chaining is required. 

596 if ( 

597 self.SEPARATOR not in filename 

598 and self.CHAIN_SEPARATOR not in filename 

599 ): 

600 return files 

601 

602 prefix = self._get_chain_protocol_prefix(filename) 

603 

604 return [ 

605 file 

606 if (self.SEPARATOR in file or self.CHAIN_SEPARATOR in file) 

607 else prefix + file 

608 for file in files 

609 ] 

610 

611 @_translate_errors 

612 def isdir(self, dirname): 

613 """Returns whether the path is a directory or not.""" 

614 fs, path = self._fs_path(dirname) 

615 return fs.isdir(path) 

616 

617 @_translate_errors 

618 def listdir(self, dirname): 

619 """Returns a list of entries contained within a directory.""" 

620 fs, path = self._fs_path(dirname) 

621 files = fs.listdir(path, detail=False) 

622 files = [os.path.basename(fname) for fname in files] 

623 return files 

624 

625 @_translate_errors 

626 def makedirs(self, dirname): 

627 """Creates a directory and all parent/intermediate directories.""" 

628 fs, path = self._fs_path(dirname) 

629 return fs.makedirs(path, exist_ok=True) 

630 

631 @_translate_errors 

632 def stat(self, filename): 

633 """Returns file statistics for a given path.""" 

634 fs, path = self._fs_path(filename) 

635 return StatData(fs.size(path)) 

636 

637 

638_FSSPEC_FILESYSTEM = FSSpecFileSystem() 

639 

640 

641def _get_fsspec_filesystem(filename): 

642 """ 

643 _get_fsspec_filesystem checks if the provided protocol is known to fsspec 

644 and if so returns the filesystem wrapper for it. 

645 """ 

646 if not FSSPEC_ENABLED: 

647 return None 

648 

649 segment = filename.partition(FSSpecFileSystem.CHAIN_SEPARATOR)[0] 

650 protocol = segment.partition(FSSpecFileSystem.SEPARATOR)[0] 

651 if fsspec.get_filesystem_class(protocol): 

652 return _FSSPEC_FILESYSTEM 

653 else: 

654 return None 

655 

656 

657register_filesystem("", LocalFileSystem()) 

658if S3_ENABLED: 

659 register_filesystem("s3", S3FileSystem()) 

660 

661 

662class GFile: 

663 # Only methods needed for TensorBoard are implemented. 

664 

665 def __init__(self, filename, mode): 

666 if mode not in ("r", "rb", "br", "w", "wb", "bw"): 

667 raise NotImplementedError( 

668 "mode {} not supported by compat GFile".format(mode) 

669 ) 

670 self.filename = compat.as_bytes(filename) 

671 self.fs = get_filesystem(self.filename) 

672 self.fs_supports_append = hasattr(self.fs, "append") 

673 self.buff = None 

674 # The buffer offset and the buffer chunk size are measured in the 

675 # natural units of the underlying stream, i.e. bytes for binary mode, 

676 # or characters in text mode. 

677 self.buff_chunk_size = _DEFAULT_BLOCK_SIZE 

678 self.buff_offset = 0 

679 self.continuation_token = None 

680 self.write_temp = None 

681 self.write_started = False 

682 self.binary_mode = "b" in mode 

683 self.write_mode = "w" in mode 

684 self.closed = False 

685 

686 def __enter__(self): 

687 return self 

688 

689 def __exit__(self, *args): 

690 self.close() 

691 self.buff = None 

692 self.buff_offset = 0 

693 self.continuation_token = None 

694 

695 def __iter__(self): 

696 return self 

697 

698 def _read_buffer_to_offset(self, new_buff_offset): 

699 old_buff_offset = self.buff_offset 

700 read_size = min(len(self.buff), new_buff_offset) - old_buff_offset 

701 self.buff_offset += read_size 

702 return self.buff[old_buff_offset : old_buff_offset + read_size] 

703 

704 def read(self, n=None): 

705 """Reads contents of file to a string. 

706 

707 Args: 

708 n: int, number of bytes or characters to read, otherwise 

709 read all the contents of the file 

710 

711 Returns: 

712 Subset of the contents of the file as a string or bytes. 

713 """ 

714 if self.write_mode: 

715 raise errors.PermissionDeniedError( 

716 None, None, "File not opened in read mode" 

717 ) 

718 

719 result = None 

720 if self.buff and len(self.buff) > self.buff_offset: 

721 # read from local buffer 

722 if n is not None: 

723 chunk = self._read_buffer_to_offset(self.buff_offset + n) 

724 if len(chunk) == n: 

725 return chunk 

726 result = chunk 

727 n -= len(chunk) 

728 else: 

729 # add all local buffer and update offsets 

730 result = self._read_buffer_to_offset(len(self.buff)) 

731 

732 # read from filesystem 

733 read_size = max(self.buff_chunk_size, n) if n is not None else None 

734 (self.buff, self.continuation_token) = self.fs.read( 

735 self.filename, self.binary_mode, read_size, self.continuation_token 

736 ) 

737 self.buff_offset = 0 

738 

739 # add from filesystem 

740 if n is not None: 

741 chunk = self._read_buffer_to_offset(n) 

742 else: 

743 # add all local buffer and update offsets 

744 chunk = self._read_buffer_to_offset(len(self.buff)) 

745 result = result + chunk if result else chunk 

746 

747 return result 

748 

749 def write(self, file_content): 

750 """Writes string file contents to file, clearing contents of the file 

751 on first write and then appending on subsequent calls. 

752 

753 Args: 

754 file_content: string, the contents 

755 """ 

756 if not self.write_mode: 

757 raise errors.PermissionDeniedError( 

758 None, None, "File not opened in write mode" 

759 ) 

760 if self.closed: 

761 raise errors.FailedPreconditionError( 

762 None, None, "File already closed" 

763 ) 

764 

765 if self.fs_supports_append: 

766 if not self.write_started: 

767 # write the first chunk to truncate file if it already exists 

768 self.fs.write(self.filename, file_content, self.binary_mode) 

769 self.write_started = True 

770 

771 else: 

772 # append the later chunks 

773 self.fs.append(self.filename, file_content, self.binary_mode) 

774 else: 

775 # add to temp file, but wait for flush to write to final filesystem 

776 if self.write_temp is None: 

777 mode = "w+b" if self.binary_mode else "w+" 

778 self.write_temp = tempfile.TemporaryFile(mode) 

779 

780 compatify = compat.as_bytes if self.binary_mode else compat.as_text 

781 self.write_temp.write(compatify(file_content)) 

782 

783 def __next__(self): 

784 line = None 

785 while True: 

786 if not self.buff: 

787 # read one unit into the buffer 

788 line = self.read(1) 

789 if line and (line[-1] == "\n" or not self.buff): 

790 return line 

791 if not self.buff: 

792 raise StopIteration() 

793 else: 

794 index = self.buff.find("\n", self.buff_offset) 

795 if index != -1: 

796 # include line until now plus newline 

797 chunk = self.read(index + 1 - self.buff_offset) 

798 line = line + chunk if line else chunk 

799 return line 

800 

801 # read one unit past end of buffer 

802 chunk = self.read(len(self.buff) + 1 - self.buff_offset) 

803 line = line + chunk if line else chunk 

804 if line and (line[-1] == "\n" or not self.buff): 

805 return line 

806 if not self.buff: 

807 raise StopIteration() 

808 

809 def next(self): 

810 return self.__next__() 

811 

812 def flush(self): 

813 if self.closed: 

814 raise errors.FailedPreconditionError( 

815 None, None, "File already closed" 

816 ) 

817 

818 if not self.fs_supports_append: 

819 if self.write_temp is not None: 

820 # read temp file from the beginning 

821 self.write_temp.flush() 

822 self.write_temp.seek(0) 

823 chunk = self.write_temp.read() 

824 if chunk is not None: 

825 # write full contents and keep in temp file 

826 self.fs.write(self.filename, chunk, self.binary_mode) 

827 self.write_temp.seek(len(chunk)) 

828 

829 def close(self): 

830 self.flush() 

831 if self.write_temp is not None: 

832 self.write_temp.close() 

833 self.write_temp = None 

834 self.write_started = False 

835 self.closed = True 

836 

837 

838def exists(filename): 

839 """Determines whether a path exists or not. 

840 

841 Args: 

842 filename: string, a path 

843 

844 Returns: 

845 True if the path exists, whether its a file or a directory. 

846 False if the path does not exist and there are no filesystem errors. 

847 

848 Raises: 

849 errors.OpError: Propagates any errors reported by the FileSystem API. 

850 """ 

851 return get_filesystem(filename).exists(filename) 

852 

853 

854def glob(filename): 

855 """Returns a list of files that match the given pattern(s). 

856 

857 Args: 

858 filename: string or iterable of strings. The glob pattern(s). 

859 

860 Returns: 

861 A list of strings containing filenames that match the given pattern(s). 

862 

863 Raises: 

864 errors.OpError: If there are filesystem / directory listing errors. 

865 """ 

866 return get_filesystem(filename).glob(filename) 

867 

868 

869def isdir(dirname): 

870 """Returns whether the path is a directory or not. 

871 

872 Args: 

873 dirname: string, path to a potential directory 

874 

875 Returns: 

876 True, if the path is a directory; False otherwise 

877 """ 

878 return get_filesystem(dirname).isdir(dirname) 

879 

880 

881def listdir(dirname): 

882 """Returns a list of entries contained within a directory. 

883 

884 The list is in arbitrary order. It does not contain the special entries "." 

885 and "..". 

886 

887 Args: 

888 dirname: string, path to a directory 

889 

890 Returns: 

891 [filename1, filename2, ... filenameN] as strings 

892 

893 Raises: 

894 errors.NotFoundError if directory doesn't exist 

895 """ 

896 return get_filesystem(dirname).listdir(dirname) 

897 

898 

899def makedirs(path): 

900 """Creates a directory and all parent/intermediate directories. 

901 

902 It succeeds if path already exists and is writable. 

903 

904 Args: 

905 path: string, name of the directory to be created 

906 """ 

907 return get_filesystem(path).makedirs(path) 

908 

909 

910def walk(top, topdown=True, onerror=None): 

911 """Recursive directory tree generator for directories. 

912 

913 Args: 

914 top: string, a Directory name 

915 topdown: bool, Traverse pre order if True, post order if False. 

916 onerror: optional handler for errors. Should be a function, it will be 

917 called with the error as argument. Rethrowing the error aborts the walk. 

918 

919 Errors that happen while listing directories are ignored. 

920 

921 Yields: 

922 Each yield is a 3-tuple: the pathname of a directory, followed by lists 

923 of all its subdirectories and leaf files. 

924 (dirname, [subdirname, subdirname, ...], [filename, filename, ...]) 

925 as strings 

926 """ 

927 top = compat.as_str_any(top) 

928 fs = get_filesystem(top) 

929 try: 

930 listing = listdir(top) 

931 except errors.NotFoundError as err: 

932 if onerror: 

933 onerror(err) 

934 else: 

935 return 

936 

937 files = [] 

938 subdirs = [] 

939 for item in listing: 

940 full_path = fs.join(top, compat.as_str_any(item)) 

941 if isdir(full_path): 

942 subdirs.append(item) 

943 else: 

944 files.append(item) 

945 

946 here = (top, subdirs, files) 

947 

948 if topdown: 

949 yield here 

950 

951 for subdir in subdirs: 

952 joined_subdir = fs.join(top, compat.as_str_any(subdir)) 

953 for subitem in walk(joined_subdir, topdown, onerror=onerror): 

954 yield subitem 

955 

956 if not topdown: 

957 yield here 

958 

959 

960def stat(filename): 

961 """Returns file statistics for a given path. 

962 

963 Args: 

964 filename: string, path to a file 

965 

966 Returns: 

967 FileStatistics struct that contains information about the path 

968 

969 Raises: 

970 errors.OpError: If the operation fails. 

971 """ 

972 return get_filesystem(filename).stat(filename) 

973 

974 

975# Used for tests only 

976def _write_string_to_file(filename, file_content): 

977 """Writes a string to a given file. 

978 

979 Args: 

980 filename: string, path to a file 

981 file_content: string, contents that need to be written to the file 

982 

983 Raises: 

984 errors.OpError: If there are errors during the operation. 

985 """ 

986 with GFile(filename, mode="w") as f: 

987 f.write(compat.as_text(file_content)) 

988 

989 

990# Used for tests only 

991def _read_file_to_string(filename, binary_mode=False): 

992 """Reads the entire contents of a file to a string. 

993 

994 Args: 

995 filename: string, path to a file 

996 binary_mode: whether to open the file in binary mode or not. This changes 

997 the type of the object returned. 

998 

999 Returns: 

1000 contents of the file as a string or bytes. 

1001 

1002 Raises: 

1003 errors.OpError: Raises variety of errors that are subtypes e.g. 

1004 `NotFoundError` etc. 

1005 """ 

1006 if binary_mode: 

1007 f = GFile(filename, mode="rb") 

1008 else: 

1009 f = GFile(filename, mode="r") 

1010 return f.read()