Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorboard/compat/tensorflow_stub/io/gfile.py: 23%
475 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""A limited reimplementation of the TensorFlow FileIO API.
17The TensorFlow version wraps the C++ FileSystem API. Here we provide a
18pure Python implementation, limited to the features required for
19TensorBoard. This allows running TensorBoard without depending on
20TensorFlow for file operations.
21"""
23import dataclasses
24import glob as py_glob
25import io
26import os
27import os.path
28import sys
29import tempfile
31try:
32 import botocore.exceptions
33 import boto3
35 S3_ENABLED = True
36except ImportError:
37 S3_ENABLED = False
39try:
40 import fsspec
42 FSSPEC_ENABLED = True
43except ImportError:
44 FSSPEC_ENABLED = False
46if sys.version_info < (3, 0):
47 # In Python 2 FileExistsError is not defined and the
48 # error manifests it as OSError.
49 FileExistsError = OSError
51from tensorboard.compat.tensorflow_stub import compat, errors
54# A good default block size depends on the system in question.
55# A somewhat conservative default chosen here.
56_DEFAULT_BLOCK_SIZE = 16 * 1024 * 1024
59# Registry of filesystems by prefix.
60#
61# Currently supports "s3://" URLs for S3 based on boto3 and falls
62# back to local filesystem.
63_REGISTERED_FILESYSTEMS = {}
66def register_filesystem(prefix, filesystem):
67 if ":" in prefix:
68 raise ValueError("Filesystem prefix cannot contain a :")
69 _REGISTERED_FILESYSTEMS[prefix] = filesystem
72def get_filesystem(filename):
73 """Return the registered filesystem for the given file."""
74 filename = compat.as_str_any(filename)
75 prefix = ""
76 index = filename.find("://")
77 if index >= 0:
78 prefix = filename[:index]
79 fs = _REGISTERED_FILESYSTEMS.get(prefix, None)
80 if fs is None:
81 fs = _get_fsspec_filesystem(filename)
82 if fs is None:
83 raise ValueError("No recognized filesystem for prefix %s" % prefix)
84 return fs
87@dataclasses.dataclass(frozen=True)
88class StatData:
89 """Data returned from the Stat call.
91 Attributes:
92 length: Length of the data content.
93 """
95 length: int
98class LocalFileSystem:
99 """Provides local fileystem access."""
101 def exists(self, filename):
102 """Determines whether a path exists or not."""
103 return os.path.exists(compat.as_bytes(filename))
105 def join(self, path, *paths):
106 """Join paths with path delimiter."""
107 return os.path.join(path, *paths)
109 def read(self, filename, binary_mode=False, size=None, continue_from=None):
110 """Reads contents of a file to a string.
112 Args:
113 filename: string, a path
114 binary_mode: bool, read as binary if True, otherwise text
115 size: int, number of bytes or characters to read, otherwise
116 read all the contents of the file (from the continuation
117 marker, if present).
118 continue_from: An opaque value returned from a prior invocation of
119 `read(...)` marking the last read position, so that reading
120 may continue from there. Otherwise read from the beginning.
122 Returns:
123 A tuple of `(data, continuation_token)` where `data' provides either
124 bytes read from the file (if `binary_mode == true`) or the decoded
125 string representation thereof (otherwise), and `continuation_token`
126 is an opaque value that can be passed to the next invocation of
127 `read(...) ' in order to continue from the last read position.
128 """
129 mode = "rb" if binary_mode else "r"
130 encoding = None if binary_mode else "utf8"
131 if not exists(filename):
132 raise errors.NotFoundError(
133 None, None, "Not Found: " + compat.as_text(filename)
134 )
135 offset = None
136 if continue_from is not None:
137 offset = continue_from.get("opaque_offset", None)
138 with io.open(filename, mode, encoding=encoding) as f:
139 if offset is not None:
140 f.seek(offset)
141 data = f.read(size)
142 # The new offset may not be `offset + len(data)`, due to decoding
143 # and newline translation.
144 # So, just measure it in whatever terms the underlying stream uses.
145 continuation_token = {"opaque_offset": f.tell()}
146 return (data, continuation_token)
148 def write(self, filename, file_content, binary_mode=False):
149 """Writes string file contents to a file, overwriting any existing
150 contents.
152 Args:
153 filename: string, a path
154 file_content: string, the contents
155 binary_mode: bool, write as binary if True, otherwise text
156 """
157 self._write(filename, file_content, "wb" if binary_mode else "w")
159 def append(self, filename, file_content, binary_mode=False):
160 """Append string file contents to a file.
162 Args:
163 filename: string, a path
164 file_content: string, the contents to append
165 binary_mode: bool, write as binary if True, otherwise text
166 """
167 self._write(filename, file_content, "ab" if binary_mode else "a")
169 def _write(self, filename, file_content, mode):
170 encoding = None if "b" in mode else "utf8"
171 with io.open(filename, mode, encoding=encoding) as f:
172 compatify = compat.as_bytes if "b" in mode else compat.as_text
173 f.write(compatify(file_content))
175 def glob(self, filename):
176 """Returns a list of files that match the given pattern(s)."""
177 if isinstance(filename, str):
178 return [
179 # Convert the filenames to string from bytes.
180 compat.as_str_any(matching_filename)
181 for matching_filename in py_glob.glob(compat.as_bytes(filename))
182 ]
183 else:
184 return [
185 # Convert the filenames to string from bytes.
186 compat.as_str_any(matching_filename)
187 for single_filename in filename
188 for matching_filename in py_glob.glob(
189 compat.as_bytes(single_filename)
190 )
191 ]
193 def isdir(self, dirname):
194 """Returns whether the path is a directory or not."""
195 return os.path.isdir(compat.as_bytes(dirname))
197 def listdir(self, dirname):
198 """Returns a list of entries contained within a directory."""
199 if not self.isdir(dirname):
200 raise errors.NotFoundError(None, None, "Could not find directory")
202 entries = os.listdir(compat.as_str_any(dirname))
203 entries = [compat.as_str_any(item) for item in entries]
204 return entries
206 def makedirs(self, path):
207 """Creates a directory and all parent/intermediate directories."""
208 os.makedirs(path, exist_ok=True)
210 def stat(self, filename):
211 """Returns file statistics for a given path."""
212 # NOTE: Size of the file is given by .st_size as returned from
213 # os.stat(), but we convert to .length
214 try:
215 file_length = os.stat(compat.as_bytes(filename)).st_size
216 except OSError:
217 raise errors.NotFoundError(None, None, "Could not find file")
218 return StatData(file_length)
221class S3FileSystem:
222 """Provides filesystem access to S3."""
224 def __init__(self):
225 if not boto3:
226 raise ImportError("boto3 must be installed for S3 support.")
227 self._s3_endpoint = os.environ.get("S3_ENDPOINT", None)
229 def bucket_and_path(self, url):
230 """Split an S3-prefixed URL into bucket and path."""
231 url = compat.as_str_any(url)
232 if url.startswith("s3://"):
233 url = url[len("s3://") :]
234 idx = url.index("/")
235 bucket = url[:idx]
236 path = url[(idx + 1) :]
237 return bucket, path
239 def exists(self, filename):
240 """Determines whether a path exists or not."""
241 client = boto3.client("s3", endpoint_url=self._s3_endpoint)
242 bucket, path = self.bucket_and_path(filename)
243 r = client.list_objects(Bucket=bucket, Prefix=path, Delimiter="/")
244 if r.get("Contents") or r.get("CommonPrefixes"):
245 return True
246 return False
248 def join(self, path, *paths):
249 """Join paths with a slash."""
250 return "/".join((path,) + paths)
252 def read(self, filename, binary_mode=False, size=None, continue_from=None):
253 """Reads contents of a file to a string.
255 Args:
256 filename: string, a path
257 binary_mode: bool, read as binary if True, otherwise text
258 size: int, number of bytes or characters to read, otherwise
259 read all the contents of the file (from the continuation
260 marker, if present).
261 continue_from: An opaque value returned from a prior invocation of
262 `read(...)` marking the last read position, so that reading
263 may continue from there. Otherwise read from the beginning.
265 Returns:
266 A tuple of `(data, continuation_token)` where `data' provides either
267 bytes read from the file (if `binary_mode == true`) or the decoded
268 string representation thereof (otherwise), and `continuation_token`
269 is an opaque value that can be passed to the next invocation of
270 `read(...) ' in order to continue from the last read position.
271 """
272 s3 = boto3.resource("s3", endpoint_url=self._s3_endpoint)
273 bucket, path = self.bucket_and_path(filename)
274 args = {}
276 # For the S3 case, we use continuation tokens of the form
277 # {byte_offset: number}
278 offset = 0
279 if continue_from is not None:
280 offset = continue_from.get("byte_offset", 0)
282 endpoint = ""
283 if size is not None:
284 # TODO(orionr): This endpoint risks splitting a multi-byte
285 # character or splitting \r and \n in the case of CRLFs,
286 # producing decoding errors below.
287 endpoint = offset + size
289 if offset != 0 or endpoint != "":
290 # Asked for a range, so modify the request
291 args["Range"] = "bytes={}-{}".format(offset, endpoint)
293 try:
294 stream = s3.Object(bucket, path).get(**args)["Body"].read()
295 except botocore.exceptions.ClientError as exc:
296 if exc.response["Error"]["Code"] in ["416", "InvalidRange"]:
297 if size is not None:
298 # Asked for too much, so request just to the end. Do this
299 # in a second request so we don't check length in all cases.
300 client = boto3.client("s3", endpoint_url=self._s3_endpoint)
301 obj = client.head_object(Bucket=bucket, Key=path)
302 content_length = obj["ContentLength"]
303 endpoint = min(content_length, offset + size)
304 if offset == endpoint:
305 # Asked for no bytes, so just return empty
306 stream = b""
307 else:
308 args["Range"] = "bytes={}-{}".format(offset, endpoint)
309 stream = s3.Object(bucket, path).get(**args)["Body"].read()
310 else:
311 raise
312 # `stream` should contain raw bytes here (i.e., there has been neither
313 # decoding nor newline translation), so the byte offset increases by
314 # the expected amount.
315 continuation_token = {"byte_offset": (offset + len(stream))}
316 if binary_mode:
317 return (bytes(stream), continuation_token)
318 else:
319 return (stream.decode("utf-8"), continuation_token)
321 def write(self, filename, file_content, binary_mode=False):
322 """Writes string file contents to a file.
324 Args:
325 filename: string, a path
326 file_content: string, the contents
327 binary_mode: bool, write as binary if True, otherwise text
328 """
329 client = boto3.client("s3", endpoint_url=self._s3_endpoint)
330 bucket, path = self.bucket_and_path(filename)
331 # Always convert to bytes for writing
332 if binary_mode:
333 if not isinstance(file_content, bytes):
334 raise TypeError("File content type must be bytes")
335 else:
336 file_content = compat.as_bytes(file_content)
337 client.put_object(Body=file_content, Bucket=bucket, Key=path)
339 def glob(self, filename):
340 """Returns a list of files that match the given pattern(s)."""
341 # Only support prefix with * at the end and no ? in the string
342 star_i = filename.find("*")
343 quest_i = filename.find("?")
344 if quest_i >= 0:
345 raise NotImplementedError(
346 "{} not supported by compat glob".format(filename)
347 )
348 if star_i != len(filename) - 1:
349 # Just return empty so we can use glob from directory watcher
350 #
351 # TODO: Remove and instead handle in GetLogdirSubdirectories.
352 # However, we would need to handle it for all non-local registered
353 # filesystems in some way.
354 return []
355 filename = filename[:-1]
356 client = boto3.client("s3", endpoint_url=self._s3_endpoint)
357 bucket, path = self.bucket_and_path(filename)
358 p = client.get_paginator("list_objects")
359 keys = []
360 for r in p.paginate(Bucket=bucket, Prefix=path):
361 for o in r.get("Contents", []):
362 key = o["Key"][len(path) :]
363 if key: # Skip the base dir, which would add an empty string
364 keys.append(filename + key)
365 return keys
367 def isdir(self, dirname):
368 """Returns whether the path is a directory or not."""
369 client = boto3.client("s3", endpoint_url=self._s3_endpoint)
370 bucket, path = self.bucket_and_path(dirname)
371 if not path.endswith("/"):
372 path += "/" # This will now only retrieve subdir content
373 r = client.list_objects(Bucket=bucket, Prefix=path, Delimiter="/")
374 if r.get("Contents") or r.get("CommonPrefixes"):
375 return True
376 return False
378 def listdir(self, dirname):
379 """Returns a list of entries contained within a directory."""
380 client = boto3.client("s3", endpoint_url=self._s3_endpoint)
381 bucket, path = self.bucket_and_path(dirname)
382 p = client.get_paginator("list_objects")
383 if not path.endswith("/"):
384 path += "/" # This will now only retrieve subdir content
385 keys = []
386 for r in p.paginate(Bucket=bucket, Prefix=path, Delimiter="/"):
387 keys.extend(
388 o["Prefix"][len(path) : -1] for o in r.get("CommonPrefixes", [])
389 )
390 for o in r.get("Contents", []):
391 key = o["Key"][len(path) :]
392 if key: # Skip the base dir, which would add an empty string
393 keys.append(key)
394 return keys
396 def makedirs(self, dirname):
397 """Creates a directory and all parent/intermediate directories."""
398 if not self.exists(dirname):
399 client = boto3.client("s3", endpoint_url=self._s3_endpoint)
400 bucket, path = self.bucket_and_path(dirname)
401 if not path.endswith("/"):
402 path += "/" # This will make sure we don't override a file
403 client.put_object(Body="", Bucket=bucket, Key=path)
405 def stat(self, filename):
406 """Returns file statistics for a given path."""
407 # NOTE: Size of the file is given by ContentLength from S3,
408 # but we convert to .length
409 client = boto3.client("s3", endpoint_url=self._s3_endpoint)
410 bucket, path = self.bucket_and_path(filename)
411 try:
412 obj = client.head_object(Bucket=bucket, Key=path)
413 return StatData(obj["ContentLength"])
414 except botocore.exceptions.ClientError as exc:
415 if exc.response["Error"]["Code"] == "404":
416 raise errors.NotFoundError(None, None, "Could not find file")
417 else:
418 raise
421class FSSpecFileSystem:
422 """Provides filesystem access via fsspec.
424 The current gfile interface doesn't map perfectly to the fsspec interface
425 leading to some notable inefficiencies.
427 * Reads and writes to files cause the file to be reopened each time which
428 can cause a performance hit when accessing local file systems.
429 * walk doesn't use the native fsspec walk function so performance may be
430 slower.
432 See https://github.com/tensorflow/tensorboard/issues/5286 for more info on
433 limitations.
434 """
436 SEPARATOR = "://"
437 CHAIN_SEPARATOR = "::"
439 def _validate_path(self, path):
440 parts = path.split(self.CHAIN_SEPARATOR)
441 for part in parts[:-1]:
442 if self.SEPARATOR in part:
443 raise errors.InvalidArgumentError(
444 None,
445 None,
446 "fsspec URL must only have paths in the last chained filesystem, got {}".format(
447 path
448 ),
449 )
451 def _translate_errors(func):
452 def func_wrapper(self, *args, **kwargs):
453 try:
454 return func(self, *args, **kwargs)
455 except FileNotFoundError as e:
456 raise errors.NotFoundError(None, None, str(e))
458 return func_wrapper
460 def _fs_path(self, filename):
461 if isinstance(filename, bytes):
462 filename = filename.decode("utf-8")
463 self._validate_path(filename)
465 fs, path = fsspec.core.url_to_fs(filename)
466 return fs, path
468 @_translate_errors
469 def exists(self, filename):
470 """Determines whether a path exists or not."""
471 fs, path = self._fs_path(filename)
472 return fs.exists(path)
474 def _join(self, sep, paths):
475 """
476 _join joins the paths with the given separator.
477 """
478 result = []
479 for part in paths:
480 if part.startswith(sep):
481 result = []
482 if result and result[-1] and not result[-1].endswith(sep):
483 result.append(sep)
484 result.append(part)
485 return "".join(result)
487 @_translate_errors
488 def join(self, path, *paths):
489 """Join paths with a slash."""
490 self._validate_path(path)
492 before, sep, last_path = path.rpartition(self.CHAIN_SEPARATOR)
493 chain_prefix = before + sep
494 protocol, path = fsspec.core.split_protocol(last_path)
495 fs = fsspec.get_filesystem_class(protocol)
496 if protocol:
497 chain_prefix += protocol + self.SEPARATOR
498 return chain_prefix + self._join(fs.sep, ((path,) + paths))
500 @_translate_errors
501 def read(self, filename, binary_mode=False, size=None, continue_from=None):
502 """Reads contents of a file to a string.
504 Args:
505 filename: string, a path
506 binary_mode: bool, read as binary if True, otherwise text
507 size: int, number of bytes or characters to read, otherwise
508 read all the contents of the file (from the continuation
509 marker, if present).
510 continue_from: An opaque value returned from a prior invocation of
511 `read(...)` marking the last read position, so that reading
512 may continue from there. Otherwise read from the beginning.
514 Returns:
515 A tuple of `(data, continuation_token)` where `data' provides either
516 bytes read from the file (if `binary_mode == true`) or the decoded
517 string representation thereof (otherwise), and `continuation_token`
518 is an opaque value that can be passed to the next invocation of
519 `read(...) ' in order to continue from the last read position.
520 """
521 fs, path = self._fs_path(filename)
523 mode = "rb" if binary_mode else "r"
524 encoding = None if binary_mode else "utf8"
525 if not exists(filename):
526 raise errors.NotFoundError(
527 None, None, "Not Found: " + compat.as_text(filename)
528 )
529 with fs.open(path, mode, encoding=encoding) as f:
530 if continue_from is not None:
531 if not f.seekable():
532 raise errors.InvalidArgumentError(
533 None,
534 None,
535 "{} is not seekable".format(filename),
536 )
537 offset = continue_from.get("opaque_offset", None)
538 if offset is not None:
539 f.seek(offset)
541 data = f.read(size)
542 # The new offset may not be `offset + len(data)`, due to decoding
543 # and newline translation.
544 # So, just measure it in whatever terms the underlying stream uses.
545 continuation_token = (
546 {"opaque_offset": f.tell()} if f.seekable() else {}
547 )
548 return (data, continuation_token)
550 @_translate_errors
551 def write(self, filename, file_content, binary_mode=False):
552 """Writes string file contents to a file.
554 Args:
555 filename: string, a path
556 file_content: string, the contents
557 binary_mode: bool, write as binary if True, otherwise text
558 """
559 self._write(filename, file_content, "wb" if binary_mode else "w")
561 @_translate_errors
562 def append(self, filename, file_content, binary_mode=False):
563 """Append string file contents to a file.
565 Args:
566 filename: string, a path
567 file_content: string, the contents to append
568 binary_mode: bool, write as binary if True, otherwise text
569 """
570 self._write(filename, file_content, "ab" if binary_mode else "a")
572 def _write(self, filename, file_content, mode):
573 fs, path = self._fs_path(filename)
574 encoding = None if "b" in mode else "utf8"
575 with fs.open(path, mode, encoding=encoding) as f:
576 compatify = compat.as_bytes if "b" in mode else compat.as_text
577 f.write(compatify(file_content))
579 def _get_chain_protocol_prefix(self, filename):
580 chain_prefix, chain_sep, last_path = filename.rpartition(
581 self.CHAIN_SEPARATOR
582 )
583 protocol, sep, _ = last_path.rpartition(self.SEPARATOR)
584 return chain_prefix + chain_sep + protocol + sep
586 @_translate_errors
587 def glob(self, filename):
588 """Returns a list of files that match the given pattern(s)."""
589 if isinstance(filename, bytes):
590 filename = filename.decode("utf-8")
592 fs, path = self._fs_path(filename)
593 files = fs.glob(path)
595 # check if applying the original chaining is required.
596 if (
597 self.SEPARATOR not in filename
598 and self.CHAIN_SEPARATOR not in filename
599 ):
600 return files
602 prefix = self._get_chain_protocol_prefix(filename)
604 return [
605 file
606 if (self.SEPARATOR in file or self.CHAIN_SEPARATOR in file)
607 else prefix + file
608 for file in files
609 ]
611 @_translate_errors
612 def isdir(self, dirname):
613 """Returns whether the path is a directory or not."""
614 fs, path = self._fs_path(dirname)
615 return fs.isdir(path)
617 @_translate_errors
618 def listdir(self, dirname):
619 """Returns a list of entries contained within a directory."""
620 fs, path = self._fs_path(dirname)
621 files = fs.listdir(path, detail=False)
622 files = [os.path.basename(fname) for fname in files]
623 return files
625 @_translate_errors
626 def makedirs(self, dirname):
627 """Creates a directory and all parent/intermediate directories."""
628 fs, path = self._fs_path(dirname)
629 return fs.makedirs(path, exist_ok=True)
631 @_translate_errors
632 def stat(self, filename):
633 """Returns file statistics for a given path."""
634 fs, path = self._fs_path(filename)
635 return StatData(fs.size(path))
638_FSSPEC_FILESYSTEM = FSSpecFileSystem()
641def _get_fsspec_filesystem(filename):
642 """
643 _get_fsspec_filesystem checks if the provided protocol is known to fsspec
644 and if so returns the filesystem wrapper for it.
645 """
646 if not FSSPEC_ENABLED:
647 return None
649 segment = filename.partition(FSSpecFileSystem.CHAIN_SEPARATOR)[0]
650 protocol = segment.partition(FSSpecFileSystem.SEPARATOR)[0]
651 if fsspec.get_filesystem_class(protocol):
652 return _FSSPEC_FILESYSTEM
653 else:
654 return None
657register_filesystem("", LocalFileSystem())
658if S3_ENABLED:
659 register_filesystem("s3", S3FileSystem())
662class GFile:
663 # Only methods needed for TensorBoard are implemented.
665 def __init__(self, filename, mode):
666 if mode not in ("r", "rb", "br", "w", "wb", "bw"):
667 raise NotImplementedError(
668 "mode {} not supported by compat GFile".format(mode)
669 )
670 self.filename = compat.as_bytes(filename)
671 self.fs = get_filesystem(self.filename)
672 self.fs_supports_append = hasattr(self.fs, "append")
673 self.buff = None
674 # The buffer offset and the buffer chunk size are measured in the
675 # natural units of the underlying stream, i.e. bytes for binary mode,
676 # or characters in text mode.
677 self.buff_chunk_size = _DEFAULT_BLOCK_SIZE
678 self.buff_offset = 0
679 self.continuation_token = None
680 self.write_temp = None
681 self.write_started = False
682 self.binary_mode = "b" in mode
683 self.write_mode = "w" in mode
684 self.closed = False
686 def __enter__(self):
687 return self
689 def __exit__(self, *args):
690 self.close()
691 self.buff = None
692 self.buff_offset = 0
693 self.continuation_token = None
695 def __iter__(self):
696 return self
698 def _read_buffer_to_offset(self, new_buff_offset):
699 old_buff_offset = self.buff_offset
700 read_size = min(len(self.buff), new_buff_offset) - old_buff_offset
701 self.buff_offset += read_size
702 return self.buff[old_buff_offset : old_buff_offset + read_size]
704 def read(self, n=None):
705 """Reads contents of file to a string.
707 Args:
708 n: int, number of bytes or characters to read, otherwise
709 read all the contents of the file
711 Returns:
712 Subset of the contents of the file as a string or bytes.
713 """
714 if self.write_mode:
715 raise errors.PermissionDeniedError(
716 None, None, "File not opened in read mode"
717 )
719 result = None
720 if self.buff and len(self.buff) > self.buff_offset:
721 # read from local buffer
722 if n is not None:
723 chunk = self._read_buffer_to_offset(self.buff_offset + n)
724 if len(chunk) == n:
725 return chunk
726 result = chunk
727 n -= len(chunk)
728 else:
729 # add all local buffer and update offsets
730 result = self._read_buffer_to_offset(len(self.buff))
732 # read from filesystem
733 read_size = max(self.buff_chunk_size, n) if n is not None else None
734 (self.buff, self.continuation_token) = self.fs.read(
735 self.filename, self.binary_mode, read_size, self.continuation_token
736 )
737 self.buff_offset = 0
739 # add from filesystem
740 if n is not None:
741 chunk = self._read_buffer_to_offset(n)
742 else:
743 # add all local buffer and update offsets
744 chunk = self._read_buffer_to_offset(len(self.buff))
745 result = result + chunk if result else chunk
747 return result
749 def write(self, file_content):
750 """Writes string file contents to file, clearing contents of the file
751 on first write and then appending on subsequent calls.
753 Args:
754 file_content: string, the contents
755 """
756 if not self.write_mode:
757 raise errors.PermissionDeniedError(
758 None, None, "File not opened in write mode"
759 )
760 if self.closed:
761 raise errors.FailedPreconditionError(
762 None, None, "File already closed"
763 )
765 if self.fs_supports_append:
766 if not self.write_started:
767 # write the first chunk to truncate file if it already exists
768 self.fs.write(self.filename, file_content, self.binary_mode)
769 self.write_started = True
771 else:
772 # append the later chunks
773 self.fs.append(self.filename, file_content, self.binary_mode)
774 else:
775 # add to temp file, but wait for flush to write to final filesystem
776 if self.write_temp is None:
777 mode = "w+b" if self.binary_mode else "w+"
778 self.write_temp = tempfile.TemporaryFile(mode)
780 compatify = compat.as_bytes if self.binary_mode else compat.as_text
781 self.write_temp.write(compatify(file_content))
783 def __next__(self):
784 line = None
785 while True:
786 if not self.buff:
787 # read one unit into the buffer
788 line = self.read(1)
789 if line and (line[-1] == "\n" or not self.buff):
790 return line
791 if not self.buff:
792 raise StopIteration()
793 else:
794 index = self.buff.find("\n", self.buff_offset)
795 if index != -1:
796 # include line until now plus newline
797 chunk = self.read(index + 1 - self.buff_offset)
798 line = line + chunk if line else chunk
799 return line
801 # read one unit past end of buffer
802 chunk = self.read(len(self.buff) + 1 - self.buff_offset)
803 line = line + chunk if line else chunk
804 if line and (line[-1] == "\n" or not self.buff):
805 return line
806 if not self.buff:
807 raise StopIteration()
809 def next(self):
810 return self.__next__()
812 def flush(self):
813 if self.closed:
814 raise errors.FailedPreconditionError(
815 None, None, "File already closed"
816 )
818 if not self.fs_supports_append:
819 if self.write_temp is not None:
820 # read temp file from the beginning
821 self.write_temp.flush()
822 self.write_temp.seek(0)
823 chunk = self.write_temp.read()
824 if chunk is not None:
825 # write full contents and keep in temp file
826 self.fs.write(self.filename, chunk, self.binary_mode)
827 self.write_temp.seek(len(chunk))
829 def close(self):
830 self.flush()
831 if self.write_temp is not None:
832 self.write_temp.close()
833 self.write_temp = None
834 self.write_started = False
835 self.closed = True
838def exists(filename):
839 """Determines whether a path exists or not.
841 Args:
842 filename: string, a path
844 Returns:
845 True if the path exists, whether its a file or a directory.
846 False if the path does not exist and there are no filesystem errors.
848 Raises:
849 errors.OpError: Propagates any errors reported by the FileSystem API.
850 """
851 return get_filesystem(filename).exists(filename)
854def glob(filename):
855 """Returns a list of files that match the given pattern(s).
857 Args:
858 filename: string or iterable of strings. The glob pattern(s).
860 Returns:
861 A list of strings containing filenames that match the given pattern(s).
863 Raises:
864 errors.OpError: If there are filesystem / directory listing errors.
865 """
866 return get_filesystem(filename).glob(filename)
869def isdir(dirname):
870 """Returns whether the path is a directory or not.
872 Args:
873 dirname: string, path to a potential directory
875 Returns:
876 True, if the path is a directory; False otherwise
877 """
878 return get_filesystem(dirname).isdir(dirname)
881def listdir(dirname):
882 """Returns a list of entries contained within a directory.
884 The list is in arbitrary order. It does not contain the special entries "."
885 and "..".
887 Args:
888 dirname: string, path to a directory
890 Returns:
891 [filename1, filename2, ... filenameN] as strings
893 Raises:
894 errors.NotFoundError if directory doesn't exist
895 """
896 return get_filesystem(dirname).listdir(dirname)
899def makedirs(path):
900 """Creates a directory and all parent/intermediate directories.
902 It succeeds if path already exists and is writable.
904 Args:
905 path: string, name of the directory to be created
906 """
907 return get_filesystem(path).makedirs(path)
910def walk(top, topdown=True, onerror=None):
911 """Recursive directory tree generator for directories.
913 Args:
914 top: string, a Directory name
915 topdown: bool, Traverse pre order if True, post order if False.
916 onerror: optional handler for errors. Should be a function, it will be
917 called with the error as argument. Rethrowing the error aborts the walk.
919 Errors that happen while listing directories are ignored.
921 Yields:
922 Each yield is a 3-tuple: the pathname of a directory, followed by lists
923 of all its subdirectories and leaf files.
924 (dirname, [subdirname, subdirname, ...], [filename, filename, ...])
925 as strings
926 """
927 top = compat.as_str_any(top)
928 fs = get_filesystem(top)
929 try:
930 listing = listdir(top)
931 except errors.NotFoundError as err:
932 if onerror:
933 onerror(err)
934 else:
935 return
937 files = []
938 subdirs = []
939 for item in listing:
940 full_path = fs.join(top, compat.as_str_any(item))
941 if isdir(full_path):
942 subdirs.append(item)
943 else:
944 files.append(item)
946 here = (top, subdirs, files)
948 if topdown:
949 yield here
951 for subdir in subdirs:
952 joined_subdir = fs.join(top, compat.as_str_any(subdir))
953 for subitem in walk(joined_subdir, topdown, onerror=onerror):
954 yield subitem
956 if not topdown:
957 yield here
960def stat(filename):
961 """Returns file statistics for a given path.
963 Args:
964 filename: string, path to a file
966 Returns:
967 FileStatistics struct that contains information about the path
969 Raises:
970 errors.OpError: If the operation fails.
971 """
972 return get_filesystem(filename).stat(filename)
975# Used for tests only
976def _write_string_to_file(filename, file_content):
977 """Writes a string to a given file.
979 Args:
980 filename: string, path to a file
981 file_content: string, contents that need to be written to the file
983 Raises:
984 errors.OpError: If there are errors during the operation.
985 """
986 with GFile(filename, mode="w") as f:
987 f.write(compat.as_text(file_content))
990# Used for tests only
991def _read_file_to_string(filename, binary_mode=False):
992 """Reads the entire contents of a file to a string.
994 Args:
995 filename: string, path to a file
996 binary_mode: whether to open the file in binary mode or not. This changes
997 the type of the object returned.
999 Returns:
1000 contents of the file as a string or bytes.
1002 Raises:
1003 errors.OpError: Raises variety of errors that are subtypes e.g.
1004 `NotFoundError` etc.
1005 """
1006 if binary_mode:
1007 f = GFile(filename, mode="rb")
1008 else:
1009 f = GFile(filename, mode="r")
1010 return f.read()