Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/transaction.py: 32%
44 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1class Transaction(object):
2 """Filesystem transaction write context
4 Gathers files for deferred commit or discard, so that several write
5 operations can be finalized semi-atomically. This works by having this
6 instance as the ``.transaction`` attribute of the given filesystem
7 """
9 def __init__(self, fs):
10 """
11 Parameters
12 ----------
13 fs: FileSystem instance
14 """
15 self.fs = fs
16 self.files = []
18 def __enter__(self):
19 self.start()
21 def __exit__(self, exc_type, exc_val, exc_tb):
22 """End transaction and commit, if exit is not due to exception"""
23 # only commit if there was no exception
24 self.complete(commit=exc_type is None)
25 self.fs._intrans = False
26 self.fs._transaction = None
28 def start(self):
29 """Start a transaction on this FileSystem"""
30 self.files = [] # clean up after previous failed completions
31 self.fs._intrans = True
33 def complete(self, commit=True):
34 """Finish transaction: commit or discard all deferred files"""
35 for f in self.files:
36 if commit:
37 f.commit()
38 else:
39 f.discard()
40 self.files = []
41 self.fs._intrans = False
44class FileActor(object):
45 def __init__(self):
46 self.files = []
48 def commit(self):
49 for f in self.files:
50 f.commit()
51 self.files.clear()
53 def discard(self):
54 for f in self.files:
55 f.discard()
56 self.files.clear()
58 def append(self, f):
59 self.files.append(f)
62class DaskTransaction(Transaction):
63 def __init__(self, fs):
64 """
65 Parameters
66 ----------
67 fs: FileSystem instance
68 """
69 import distributed
71 super().__init__(fs)
72 client = distributed.default_client()
73 self.files = client.submit(FileActor, actor=True).result()
75 def complete(self, commit=True):
76 """Finish transaction: commit or discard all deferred files"""
77 if commit:
78 self.files.commit().result()
79 else:
80 self.files.discard().result()
81 self.fs._intrans = False