Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/fsspec/transaction.py: 32%

44 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1class Transaction(object): 

2 """Filesystem transaction write context 

3 

4 Gathers files for deferred commit or discard, so that several write 

5 operations can be finalized semi-atomically. This works by having this 

6 instance as the ``.transaction`` attribute of the given filesystem 

7 """ 

8 

9 def __init__(self, fs): 

10 """ 

11 Parameters 

12 ---------- 

13 fs: FileSystem instance 

14 """ 

15 self.fs = fs 

16 self.files = [] 

17 

18 def __enter__(self): 

19 self.start() 

20 

21 def __exit__(self, exc_type, exc_val, exc_tb): 

22 """End transaction and commit, if exit is not due to exception""" 

23 # only commit if there was no exception 

24 self.complete(commit=exc_type is None) 

25 self.fs._intrans = False 

26 self.fs._transaction = None 

27 

28 def start(self): 

29 """Start a transaction on this FileSystem""" 

30 self.files = [] # clean up after previous failed completions 

31 self.fs._intrans = True 

32 

33 def complete(self, commit=True): 

34 """Finish transaction: commit or discard all deferred files""" 

35 for f in self.files: 

36 if commit: 

37 f.commit() 

38 else: 

39 f.discard() 

40 self.files = [] 

41 self.fs._intrans = False 

42 

43 

44class FileActor(object): 

45 def __init__(self): 

46 self.files = [] 

47 

48 def commit(self): 

49 for f in self.files: 

50 f.commit() 

51 self.files.clear() 

52 

53 def discard(self): 

54 for f in self.files: 

55 f.discard() 

56 self.files.clear() 

57 

58 def append(self, f): 

59 self.files.append(f) 

60 

61 

62class DaskTransaction(Transaction): 

63 def __init__(self, fs): 

64 """ 

65 Parameters 

66 ---------- 

67 fs: FileSystem instance 

68 """ 

69 import distributed 

70 

71 super().__init__(fs) 

72 client = distributed.default_client() 

73 self.files = client.submit(FileActor, actor=True).result() 

74 

75 def complete(self, commit=True): 

76 """Finish transaction: commit or discard all deferred files""" 

77 if commit: 

78 self.files.commit().result() 

79 else: 

80 self.files.discard().result() 

81 self.fs._intrans = False