Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/fsspec/transaction.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

51 statements  

1from collections import deque 

2 

3 

4class Transaction: 

5 """Filesystem transaction write context 

6 

7 Gathers files for deferred commit or discard, so that several write 

8 operations can be finalized semi-atomically. This works by having this 

9 instance as the ``.transaction`` attribute of the given filesystem 

10 """ 

11 

12 def __init__(self, fs, **kwargs): 

13 """ 

14 Parameters 

15 ---------- 

16 fs: FileSystem instance 

17 """ 

18 self.fs = fs 

19 self.files = deque() 

20 

21 def __enter__(self): 

22 self.start() 

23 return self 

24 

25 def __exit__(self, exc_type, exc_val, exc_tb): 

26 """End transaction and commit, if exit is not due to exception""" 

27 # only commit if there was no exception 

28 self.complete(commit=exc_type is None) 

29 if self.fs: 

30 self.fs._intrans = False 

31 self.fs._transaction = None 

32 self.fs = None 

33 

34 def start(self): 

35 """Start a transaction on this FileSystem""" 

36 self.files = deque() # clean up after previous failed completions 

37 self.fs._intrans = True 

38 

39 def complete(self, commit=True): 

40 """Finish transaction: commit or discard all deferred files""" 

41 while self.files: 

42 f = self.files.popleft() 

43 if commit: 

44 f.commit() 

45 else: 

46 f.discard() 

47 self.fs._intrans = False 

48 self.fs._transaction = None 

49 self.fs = None 

50 

51 

52class FileActor: 

53 def __init__(self): 

54 self.files = [] 

55 

56 def commit(self): 

57 for f in self.files: 

58 f.commit() 

59 self.files.clear() 

60 

61 def discard(self): 

62 for f in self.files: 

63 f.discard() 

64 self.files.clear() 

65 

66 def append(self, f): 

67 self.files.append(f) 

68 

69 

70class DaskTransaction(Transaction): 

71 def __init__(self, fs): 

72 """ 

73 Parameters 

74 ---------- 

75 fs: FileSystem instance 

76 """ 

77 import distributed 

78 

79 super().__init__(fs) 

80 client = distributed.default_client() 

81 self.files = client.submit(FileActor, actor=True).result() 

82 

83 def complete(self, commit=True): 

84 """Finish transaction: commit or discard all deferred files""" 

85 if commit: 

86 self.files.commit().result() 

87 else: 

88 self.files.discard().result() 

89 self.fs._intrans = False 

90 self.fs = None