Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyrsistent/_field_common.py: 28%

143 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1from pyrsistent._checked_types import ( 

2 CheckedPMap, 

3 CheckedPSet, 

4 CheckedPVector, 

5 CheckedType, 

6 InvariantException, 

7 _restore_pickle, 

8 get_type, 

9 maybe_parse_user_type, 

10 maybe_parse_many_user_types, 

11) 

12from pyrsistent._checked_types import optional as optional_type 

13from pyrsistent._checked_types import wrap_invariant 

14import inspect 

15 

16 

17def set_fields(dct, bases, name): 

18 dct[name] = dict(sum([list(b.__dict__.get(name, {}).items()) for b in bases], [])) 

19 

20 for k, v in list(dct.items()): 

21 if isinstance(v, _PField): 

22 dct[name][k] = v 

23 del dct[k] 

24 

25 

26def check_global_invariants(subject, invariants): 

27 error_codes = tuple(error_code for is_ok, error_code in 

28 (invariant(subject) for invariant in invariants) if not is_ok) 

29 if error_codes: 

30 raise InvariantException(error_codes, (), 'Global invariant failed') 

31 

32 

33def serialize(serializer, format, value): 

34 if isinstance(value, CheckedType) and serializer is PFIELD_NO_SERIALIZER: 

35 return value.serialize(format) 

36 

37 return serializer(format, value) 

38 

39 

40def check_type(destination_cls, field, name, value): 

41 if field.type and not any(isinstance(value, get_type(t)) for t in field.type): 

42 actual_type = type(value) 

43 message = "Invalid type for field {0}.{1}, was {2}".format(destination_cls.__name__, name, actual_type.__name__) 

44 raise PTypeError(destination_cls, name, field.type, actual_type, message) 

45 

46 

47def is_type_cls(type_cls, field_type): 

48 if type(field_type) is set: 

49 return True 

50 types = tuple(field_type) 

51 if len(types) == 0: 

52 return False 

53 return issubclass(get_type(types[0]), type_cls) 

54 

55 

56def is_field_ignore_extra_complaint(type_cls, field, ignore_extra): 

57 # ignore_extra param has default False value, for speed purpose no need to propagate False 

58 if not ignore_extra: 

59 return False 

60 

61 if not is_type_cls(type_cls, field.type): 

62 return False 

63 

64 return 'ignore_extra' in inspect.signature(field.factory).parameters 

65 

66 

67 

68class _PField(object): 

69 __slots__ = ('type', 'invariant', 'initial', 'mandatory', '_factory', 'serializer') 

70 

71 def __init__(self, type, invariant, initial, mandatory, factory, serializer): 

72 self.type = type 

73 self.invariant = invariant 

74 self.initial = initial 

75 self.mandatory = mandatory 

76 self._factory = factory 

77 self.serializer = serializer 

78 

79 @property 

80 def factory(self): 

81 # If no factory is specified and the type is another CheckedType use the factory method of that CheckedType 

82 if self._factory is PFIELD_NO_FACTORY and len(self.type) == 1: 

83 typ = get_type(tuple(self.type)[0]) 

84 if issubclass(typ, CheckedType): 

85 return typ.create 

86 

87 return self._factory 

88 

89PFIELD_NO_TYPE = () 

90PFIELD_NO_INVARIANT = lambda _: (True, None) 

91PFIELD_NO_FACTORY = lambda x: x 

92PFIELD_NO_INITIAL = object() 

93PFIELD_NO_SERIALIZER = lambda _, value: value 

94 

95 

96def field(type=PFIELD_NO_TYPE, invariant=PFIELD_NO_INVARIANT, initial=PFIELD_NO_INITIAL, 

97 mandatory=False, factory=PFIELD_NO_FACTORY, serializer=PFIELD_NO_SERIALIZER): 

98 """ 

99 Field specification factory for :py:class:`PRecord`. 

100 

101 :param type: a type or iterable with types that are allowed for this field 

102 :param invariant: a function specifying an invariant that must hold for the field 

103 :param initial: value of field if not specified when instantiating the record 

104 :param mandatory: boolean specifying if the field is mandatory or not 

105 :param factory: function called when field is set. 

106 :param serializer: function that returns a serialized version of the field 

107 """ 

108 

109 # NB: We have to check this predicate separately from the predicates in 

110 # `maybe_parse_user_type` et al. because this one is related to supporting 

111 # the argspec for `field`, while those are related to supporting the valid 

112 # ways to specify types. 

113 

114 # Multiple types must be passed in one of the following containers. Note 

115 # that a type that is a subclass of one of these containers, like a 

116 # `collections.namedtuple`, will work as expected, since we check 

117 # `isinstance` and not `issubclass`. 

118 if isinstance(type, (list, set, tuple)): 

119 types = set(maybe_parse_many_user_types(type)) 

120 else: 

121 types = set(maybe_parse_user_type(type)) 

122 

123 invariant_function = wrap_invariant(invariant) if invariant != PFIELD_NO_INVARIANT and callable(invariant) else invariant 

124 field = _PField(type=types, invariant=invariant_function, initial=initial, 

125 mandatory=mandatory, factory=factory, serializer=serializer) 

126 

127 _check_field_parameters(field) 

128 

129 return field 

130 

131 

132def _check_field_parameters(field): 

133 for t in field.type: 

134 if not isinstance(t, type) and not isinstance(t, str): 

135 raise TypeError('Type parameter expected, not {0}'.format(type(t))) 

136 

137 if field.initial is not PFIELD_NO_INITIAL and \ 

138 not callable(field.initial) and \ 

139 field.type and not any(isinstance(field.initial, t) for t in field.type): 

140 raise TypeError('Initial has invalid type {0}'.format(type(field.initial))) 

141 

142 if not callable(field.invariant): 

143 raise TypeError('Invariant must be callable') 

144 

145 if not callable(field.factory): 

146 raise TypeError('Factory must be callable') 

147 

148 if not callable(field.serializer): 

149 raise TypeError('Serializer must be callable') 

150 

151 

152class PTypeError(TypeError): 

153 """ 

154 Raised when trying to assign a value with a type that doesn't match the declared type. 

155 

156 Attributes: 

157 source_class -- The class of the record 

158 field -- Field name 

159 expected_types -- Types allowed for the field 

160 actual_type -- The non matching type 

161 """ 

162 def __init__(self, source_class, field, expected_types, actual_type, *args, **kwargs): 

163 super(PTypeError, self).__init__(*args, **kwargs) 

164 self.source_class = source_class 

165 self.field = field 

166 self.expected_types = expected_types 

167 self.actual_type = actual_type 

168 

169 

170SEQ_FIELD_TYPE_SUFFIXES = { 

171 CheckedPVector: "PVector", 

172 CheckedPSet: "PSet", 

173} 

174 

175# Global dictionary to hold auto-generated field types: used for unpickling 

176_seq_field_types = {} 

177 

178def _restore_seq_field_pickle(checked_class, item_type, data): 

179 """Unpickling function for auto-generated PVec/PSet field types.""" 

180 type_ = _seq_field_types[checked_class, item_type] 

181 return _restore_pickle(type_, data) 

182 

183def _types_to_names(types): 

184 """Convert a tuple of types to a human-readable string.""" 

185 return "".join(get_type(typ).__name__.capitalize() for typ in types) 

186 

187def _make_seq_field_type(checked_class, item_type, item_invariant): 

188 """Create a subclass of the given checked class with the given item type.""" 

189 type_ = _seq_field_types.get((checked_class, item_type)) 

190 if type_ is not None: 

191 return type_ 

192 

193 class TheType(checked_class): 

194 __type__ = item_type 

195 __invariant__ = item_invariant 

196 

197 def __reduce__(self): 

198 return (_restore_seq_field_pickle, 

199 (checked_class, item_type, list(self))) 

200 

201 suffix = SEQ_FIELD_TYPE_SUFFIXES[checked_class] 

202 TheType.__name__ = _types_to_names(TheType._checked_types) + suffix 

203 _seq_field_types[checked_class, item_type] = TheType 

204 return TheType 

205 

206def _sequence_field(checked_class, item_type, optional, initial, 

207 invariant=PFIELD_NO_INVARIANT, 

208 item_invariant=PFIELD_NO_INVARIANT): 

209 """ 

210 Create checked field for either ``PSet`` or ``PVector``. 

211 

212 :param checked_class: ``CheckedPSet`` or ``CheckedPVector``. 

213 :param item_type: The required type for the items in the set. 

214 :param optional: If true, ``None`` can be used as a value for 

215 this field. 

216 :param initial: Initial value to pass to factory. 

217 

218 :return: A ``field`` containing a checked class. 

219 """ 

220 TheType = _make_seq_field_type(checked_class, item_type, item_invariant) 

221 

222 if optional: 

223 def factory(argument, _factory_fields=None, ignore_extra=False): 

224 if argument is None: 

225 return None 

226 else: 

227 return TheType.create(argument, _factory_fields=_factory_fields, ignore_extra=ignore_extra) 

228 else: 

229 factory = TheType.create 

230 

231 return field(type=optional_type(TheType) if optional else TheType, 

232 factory=factory, mandatory=True, 

233 invariant=invariant, 

234 initial=factory(initial)) 

235 

236 

237def pset_field(item_type, optional=False, initial=(), 

238 invariant=PFIELD_NO_INVARIANT, 

239 item_invariant=PFIELD_NO_INVARIANT): 

240 """ 

241 Create checked ``PSet`` field. 

242 

243 :param item_type: The required type for the items in the set. 

244 :param optional: If true, ``None`` can be used as a value for 

245 this field. 

246 :param initial: Initial value to pass to factory if no value is given 

247 for the field. 

248 

249 :return: A ``field`` containing a ``CheckedPSet`` of the given type. 

250 """ 

251 return _sequence_field(CheckedPSet, item_type, optional, initial, 

252 invariant=invariant, 

253 item_invariant=item_invariant) 

254 

255 

256def pvector_field(item_type, optional=False, initial=(), 

257 invariant=PFIELD_NO_INVARIANT, 

258 item_invariant=PFIELD_NO_INVARIANT): 

259 """ 

260 Create checked ``PVector`` field. 

261 

262 :param item_type: The required type for the items in the vector. 

263 :param optional: If true, ``None`` can be used as a value for 

264 this field. 

265 :param initial: Initial value to pass to factory if no value is given 

266 for the field. 

267 

268 :return: A ``field`` containing a ``CheckedPVector`` of the given type. 

269 """ 

270 return _sequence_field(CheckedPVector, item_type, optional, initial, 

271 invariant=invariant, 

272 item_invariant=item_invariant) 

273 

274 

275_valid = lambda item: (True, "") 

276 

277 

278# Global dictionary to hold auto-generated field types: used for unpickling 

279_pmap_field_types = {} 

280 

281def _restore_pmap_field_pickle(key_type, value_type, data): 

282 """Unpickling function for auto-generated PMap field types.""" 

283 type_ = _pmap_field_types[key_type, value_type] 

284 return _restore_pickle(type_, data) 

285 

286def _make_pmap_field_type(key_type, value_type): 

287 """Create a subclass of CheckedPMap with the given key and value types.""" 

288 type_ = _pmap_field_types.get((key_type, value_type)) 

289 if type_ is not None: 

290 return type_ 

291 

292 class TheMap(CheckedPMap): 

293 __key_type__ = key_type 

294 __value_type__ = value_type 

295 

296 def __reduce__(self): 

297 return (_restore_pmap_field_pickle, 

298 (self.__key_type__, self.__value_type__, dict(self))) 

299 

300 TheMap.__name__ = "{0}To{1}PMap".format( 

301 _types_to_names(TheMap._checked_key_types), 

302 _types_to_names(TheMap._checked_value_types)) 

303 _pmap_field_types[key_type, value_type] = TheMap 

304 return TheMap 

305 

306 

307def pmap_field(key_type, value_type, optional=False, invariant=PFIELD_NO_INVARIANT): 

308 """ 

309 Create a checked ``PMap`` field. 

310 

311 :param key: The required type for the keys of the map. 

312 :param value: The required type for the values of the map. 

313 :param optional: If true, ``None`` can be used as a value for 

314 this field. 

315 :param invariant: Pass-through to ``field``. 

316 

317 :return: A ``field`` containing a ``CheckedPMap``. 

318 """ 

319 TheMap = _make_pmap_field_type(key_type, value_type) 

320 

321 if optional: 

322 def factory(argument): 

323 if argument is None: 

324 return None 

325 else: 

326 return TheMap.create(argument) 

327 else: 

328 factory = TheMap.create 

329 

330 return field(mandatory=True, initial=TheMap(), 

331 type=optional_type(TheMap) if optional else TheMap, 

332 factory=factory, invariant=invariant)