Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/transaction.py: 33%
46 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1from collections import deque
4class Transaction:
5 """Filesystem transaction write context
7 Gathers files for deferred commit or discard, so that several write
8 operations can be finalized semi-atomically. This works by having this
9 instance as the ``.transaction`` attribute of the given filesystem
10 """
12 def __init__(self, fs):
13 """
14 Parameters
15 ----------
16 fs: FileSystem instance
17 """
18 self.fs = fs
19 self.files = deque()
21 def __enter__(self):
22 self.start()
23 return self
25 def __exit__(self, exc_type, exc_val, exc_tb):
26 """End transaction and commit, if exit is not due to exception"""
27 # only commit if there was no exception
28 self.complete(commit=exc_type is None)
29 self.fs._intrans = False
30 self.fs._transaction = None
32 def start(self):
33 """Start a transaction on this FileSystem"""
34 self.files = deque() # clean up after previous failed completions
35 self.fs._intrans = True
37 def complete(self, commit=True):
38 """Finish transaction: commit or discard all deferred files"""
39 while self.files:
40 f = self.files.popleft()
41 if commit:
42 f.commit()
43 else:
44 f.discard()
45 self.fs._intrans = False
48class FileActor:
49 def __init__(self):
50 self.files = []
52 def commit(self):
53 for f in self.files:
54 f.commit()
55 self.files.clear()
57 def discard(self):
58 for f in self.files:
59 f.discard()
60 self.files.clear()
62 def append(self, f):
63 self.files.append(f)
66class DaskTransaction(Transaction):
67 def __init__(self, fs):
68 """
69 Parameters
70 ----------
71 fs: FileSystem instance
72 """
73 import distributed
75 super().__init__(fs)
76 client = distributed.default_client()
77 self.files = client.submit(FileActor, actor=True).result()
79 def complete(self, commit=True):
80 """Finish transaction: commit or discard all deferred files"""
81 if commit:
82 self.files.commit().result()
83 else:
84 self.files.discard().result()
85 self.fs._intrans = False