Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/transaction.py: 33%

46 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1from collections import deque 

2 

3 

4class Transaction: 

5 """Filesystem transaction write context 

6 

7 Gathers files for deferred commit or discard, so that several write 

8 operations can be finalized semi-atomically. This works by having this 

9 instance as the ``.transaction`` attribute of the given filesystem 

10 """ 

11 

12 def __init__(self, fs): 

13 """ 

14 Parameters 

15 ---------- 

16 fs: FileSystem instance 

17 """ 

18 self.fs = fs 

19 self.files = deque() 

20 

21 def __enter__(self): 

22 self.start() 

23 return self 

24 

25 def __exit__(self, exc_type, exc_val, exc_tb): 

26 """End transaction and commit, if exit is not due to exception""" 

27 # only commit if there was no exception 

28 self.complete(commit=exc_type is None) 

29 self.fs._intrans = False 

30 self.fs._transaction = None 

31 

32 def start(self): 

33 """Start a transaction on this FileSystem""" 

34 self.files = deque() # clean up after previous failed completions 

35 self.fs._intrans = True 

36 

37 def complete(self, commit=True): 

38 """Finish transaction: commit or discard all deferred files""" 

39 while self.files: 

40 f = self.files.popleft() 

41 if commit: 

42 f.commit() 

43 else: 

44 f.discard() 

45 self.fs._intrans = False 

46 

47 

48class FileActor: 

49 def __init__(self): 

50 self.files = [] 

51 

52 def commit(self): 

53 for f in self.files: 

54 f.commit() 

55 self.files.clear() 

56 

57 def discard(self): 

58 for f in self.files: 

59 f.discard() 

60 self.files.clear() 

61 

62 def append(self, f): 

63 self.files.append(f) 

64 

65 

66class DaskTransaction(Transaction): 

67 def __init__(self, fs): 

68 """ 

69 Parameters 

70 ---------- 

71 fs: FileSystem instance 

72 """ 

73 import distributed 

74 

75 super().__init__(fs) 

76 client = distributed.default_client() 

77 self.files = client.submit(FileActor, actor=True).result() 

78 

79 def complete(self, commit=True): 

80 """Finish transaction: commit or discard all deferred files""" 

81 if commit: 

82 self.files.commit().result() 

83 else: 

84 self.files.discard().result() 

85 self.fs._intrans = False