1from collections import deque
2
3
4class Transaction:
5 """Filesystem transaction write context
6
7 Gathers files for deferred commit or discard, so that several write
8 operations can be finalized semi-atomically. This works by having this
9 instance as the ``.transaction`` attribute of the given filesystem
10 """
11
12 def __init__(self, fs, **kwargs):
13 """
14 Parameters
15 ----------
16 fs: FileSystem instance
17 """
18 self.fs = fs
19 self.files = deque()
20
21 def __enter__(self):
22 self.start()
23 return self
24
25 def __exit__(self, exc_type, exc_val, exc_tb):
26 """End transaction and commit, if exit is not due to exception"""
27 # only commit if there was no exception
28 self.complete(commit=exc_type is None)
29 if self.fs:
30 self.fs._intrans = False
31 self.fs._transaction = None
32 self.fs = None
33
34 def start(self):
35 """Start a transaction on this FileSystem"""
36 self.files = deque() # clean up after previous failed completions
37 self.fs._intrans = True
38
39 def complete(self, commit=True):
40 """Finish transaction: commit or discard all deferred files"""
41 while self.files:
42 f = self.files.popleft()
43 if commit:
44 f.commit()
45 else:
46 f.discard()
47 self.fs._intrans = False
48 self.fs._transaction = None
49 self.fs = None
50
51
52class FileActor:
53 def __init__(self):
54 self.files = []
55
56 def commit(self):
57 for f in self.files:
58 f.commit()
59 self.files.clear()
60
61 def discard(self):
62 for f in self.files:
63 f.discard()
64 self.files.clear()
65
66 def append(self, f):
67 self.files.append(f)
68
69
70class DaskTransaction(Transaction):
71 def __init__(self, fs):
72 """
73 Parameters
74 ----------
75 fs: FileSystem instance
76 """
77 import distributed
78
79 super().__init__(fs)
80 client = distributed.default_client()
81 self.files = client.submit(FileActor, actor=True).result()
82
83 def complete(self, commit=True):
84 """Finish transaction: commit or discard all deferred files"""
85 if commit:
86 self.files.commit().result()
87 else:
88 self.files.discard().result()
89 self.fs._intrans = False
90 self.fs = None