Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyrsistent/_precord.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

88 statements  

1from typing import Any 

2from pyrsistent._checked_types import CheckedType, _restore_pickle, InvariantException, store_invariants 

3from pyrsistent._field_common import ( 

4 set_fields, check_type, is_field_ignore_extra_complaint, PFIELD_NO_INITIAL, serialize, check_global_invariants 

5) 

6from pyrsistent._pmap import PMap, pmap 

7 

8 

9class _PRecordMeta(type): 

10 def __new__(mcs, name, bases, dct): 

11 set_fields(dct, bases, name='_precord_fields') 

12 store_invariants(dct, bases, '_precord_invariants', '__invariant__') 

13 dct['_precord_mandatory_fields'] = \ 

14 set(name for name, field in dct['_precord_fields'].items() if field.mandatory) 

15 dct['_precord_initial_values'] = \ 

16 dict((k, field.initial) for k, field in dct['_precord_fields'].items() if field.initial is not PFIELD_NO_INITIAL) 

17 dct['__slots__'] = () 

18 return super(_PRecordMeta, mcs).__new__(mcs, name, bases, dct) 

19 

20 

21class PRecord(PMap[str, Any], CheckedType, metaclass=_PRecordMeta): 

22 """ 

23 A PRecord is a PMap with a fixed set of specified fields. Records are declared as python classes inheriting 

24 from PRecord. Because it is a PMap it has full support for all Mapping methods such as iteration and element 

25 access using subscript notation. 

26 

27 More documentation and examples of PRecord usage is available at https://github.com/tobgu/pyrsistent 

28 """ 

29 def __new__(cls, **kwargs): 

30 # Hack total! If these two special attributes exist that means we can create 

31 # ourselves. Otherwise we need to go through the Evolver to create the structures 

32 # for us. 

33 if '_precord_size' in kwargs and '_precord_buckets' in kwargs: 

34 return super(PRecord, cls).__new__(cls, kwargs['_precord_size'], kwargs['_precord_buckets']) 

35 

36 factory_fields = kwargs.pop('_factory_fields', None) 

37 ignore_extra = kwargs.pop('_ignore_extra', False) 

38 initial_values = kwargs 

39 if cls._precord_initial_values: 

40 initial_values = dict((k, v() if callable(v) else v) 

41 for k, v in cls._precord_initial_values.items()) 

42 initial_values.update(kwargs) 

43 

44 e = _PRecordEvolver(cls, pmap(pre_size=len(cls._precord_fields)), _factory_fields=factory_fields, _ignore_extra=ignore_extra) 

45 for k, v in initial_values.items(): 

46 e[k] = v 

47 

48 return e.persistent() 

49 

50 def set(self, *args, **kwargs): 

51 """ 

52 Set a field in the record. This set function differs slightly from that in the PMap 

53 class. First of all it accepts key-value pairs. Second it accepts multiple key-value 

54 pairs to perform one, atomic, update of multiple fields. 

55 """ 

56 # The PRecord set() can accept kwargs since all fields that have been declared are 

57 # valid python identifiers. Also allow multiple fields to be set in one operation. 

58 if args: 

59 return super(PRecord, self).set(args[0], args[1]) 

60 

61 return self.update(kwargs) 

62 

63 def evolver(self): 

64 """ 

65 Returns an evolver of this object. 

66 """ 

67 return _PRecordEvolver(self.__class__, self) 

68 

69 def __repr__(self): 

70 return "{0}({1})".format(self.__class__.__name__, 

71 ', '.join('{0}={1}'.format(k, repr(v)) for k, v in self.items())) 

72 

73 @classmethod 

74 def create(cls, kwargs, _factory_fields=None, ignore_extra=False): 

75 """ 

76 Factory method. Will create a new PRecord of the current type and assign the values 

77 specified in kwargs. 

78 

79 :param ignore_extra: A boolean which when set to True will ignore any keys which appear in kwargs that are not 

80 in the set of fields on the PRecord. 

81 """ 

82 if isinstance(kwargs, cls): 

83 return kwargs 

84 

85 if ignore_extra: 

86 kwargs = {k: kwargs[k] for k in cls._precord_fields if k in kwargs} 

87 

88 return cls(_factory_fields=_factory_fields, _ignore_extra=ignore_extra, **kwargs) 

89 

90 def __reduce__(self): 

91 # Pickling support 

92 return _restore_pickle, (self.__class__, dict(self),) 

93 

94 def serialize(self, format=None): 

95 """ 

96 Serialize the current PRecord using custom serializer functions for fields where 

97 such have been supplied. 

98 """ 

99 return dict((k, serialize(self._precord_fields[k].serializer, format, v)) for k, v in self.items()) 

100 

101 

102class _PRecordEvolver(PMap._Evolver): 

103 __slots__ = ('_destination_cls', '_invariant_error_codes', '_missing_fields', '_factory_fields', '_ignore_extra') 

104 

105 def __init__(self, cls, original_pmap, _factory_fields=None, _ignore_extra=False): 

106 super(_PRecordEvolver, self).__init__(original_pmap) 

107 self._destination_cls = cls 

108 self._invariant_error_codes = [] 

109 self._missing_fields = [] 

110 self._factory_fields = _factory_fields 

111 self._ignore_extra = _ignore_extra 

112 

113 def __setitem__(self, key, original_value): 

114 self.set(key, original_value) 

115 

116 def set(self, key, original_value): 

117 field = self._destination_cls._precord_fields.get(key) 

118 if field: 

119 if self._factory_fields is None or field in self._factory_fields: 

120 try: 

121 if is_field_ignore_extra_complaint(CheckedType, field, self._ignore_extra): 

122 value = field.factory(original_value, ignore_extra=self._ignore_extra) 

123 else: 

124 value = field.factory(original_value) 

125 except InvariantException as e: 

126 self._invariant_error_codes += e.invariant_errors 

127 self._missing_fields += e.missing_fields 

128 return self 

129 else: 

130 value = original_value 

131 

132 check_type(self._destination_cls, field, key, value) 

133 is_ok, error_code = field.invariant(value) 

134 if not is_ok: 

135 self._invariant_error_codes.append(error_code) 

136 

137 return super(_PRecordEvolver, self).set(key, value) 

138 else: 

139 raise AttributeError("'{0}' is not among the specified fields for {1}".format(key, self._destination_cls.__name__)) 

140 

141 def persistent(self): 

142 cls = self._destination_cls 

143 is_dirty = self.is_dirty() 

144 pm = super(_PRecordEvolver, self).persistent() 

145 

146 if is_dirty or not isinstance(pm, cls): 

147 result = cls(_precord_buckets=pm._buckets, _precord_size=pm._size) 

148 else: 

149 result = pm 

150 

151 if cls._precord_mandatory_fields: 

152 self._missing_fields += tuple('{0}.{1}'.format(cls.__name__, f) for f 

153 in (cls._precord_mandatory_fields - set(result.keys()))) 

154 

155 if self._invariant_error_codes or self._missing_fields: 

156 raise InvariantException(tuple(self._invariant_error_codes), tuple(self._missing_fields), 

157 'Field invariant failed') 

158 

159 check_global_invariants(result, cls._precord_invariants) 

160 

161 return result