Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyrsistent/_field_common.py: 28%
143 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1from pyrsistent._checked_types import (
2 CheckedPMap,
3 CheckedPSet,
4 CheckedPVector,
5 CheckedType,
6 InvariantException,
7 _restore_pickle,
8 get_type,
9 maybe_parse_user_type,
10 maybe_parse_many_user_types,
11)
12from pyrsistent._checked_types import optional as optional_type
13from pyrsistent._checked_types import wrap_invariant
14import inspect
17def set_fields(dct, bases, name):
18 dct[name] = dict(sum([list(b.__dict__.get(name, {}).items()) for b in bases], []))
20 for k, v in list(dct.items()):
21 if isinstance(v, _PField):
22 dct[name][k] = v
23 del dct[k]
26def check_global_invariants(subject, invariants):
27 error_codes = tuple(error_code for is_ok, error_code in
28 (invariant(subject) for invariant in invariants) if not is_ok)
29 if error_codes:
30 raise InvariantException(error_codes, (), 'Global invariant failed')
33def serialize(serializer, format, value):
34 if isinstance(value, CheckedType) and serializer is PFIELD_NO_SERIALIZER:
35 return value.serialize(format)
37 return serializer(format, value)
40def check_type(destination_cls, field, name, value):
41 if field.type and not any(isinstance(value, get_type(t)) for t in field.type):
42 actual_type = type(value)
43 message = "Invalid type for field {0}.{1}, was {2}".format(destination_cls.__name__, name, actual_type.__name__)
44 raise PTypeError(destination_cls, name, field.type, actual_type, message)
47def is_type_cls(type_cls, field_type):
48 if type(field_type) is set:
49 return True
50 types = tuple(field_type)
51 if len(types) == 0:
52 return False
53 return issubclass(get_type(types[0]), type_cls)
56def is_field_ignore_extra_complaint(type_cls, field, ignore_extra):
57 # ignore_extra param has default False value, for speed purpose no need to propagate False
58 if not ignore_extra:
59 return False
61 if not is_type_cls(type_cls, field.type):
62 return False
64 return 'ignore_extra' in inspect.signature(field.factory).parameters
68class _PField(object):
69 __slots__ = ('type', 'invariant', 'initial', 'mandatory', '_factory', 'serializer')
71 def __init__(self, type, invariant, initial, mandatory, factory, serializer):
72 self.type = type
73 self.invariant = invariant
74 self.initial = initial
75 self.mandatory = mandatory
76 self._factory = factory
77 self.serializer = serializer
79 @property
80 def factory(self):
81 # If no factory is specified and the type is another CheckedType use the factory method of that CheckedType
82 if self._factory is PFIELD_NO_FACTORY and len(self.type) == 1:
83 typ = get_type(tuple(self.type)[0])
84 if issubclass(typ, CheckedType):
85 return typ.create
87 return self._factory
89PFIELD_NO_TYPE = ()
90PFIELD_NO_INVARIANT = lambda _: (True, None)
91PFIELD_NO_FACTORY = lambda x: x
92PFIELD_NO_INITIAL = object()
93PFIELD_NO_SERIALIZER = lambda _, value: value
96def field(type=PFIELD_NO_TYPE, invariant=PFIELD_NO_INVARIANT, initial=PFIELD_NO_INITIAL,
97 mandatory=False, factory=PFIELD_NO_FACTORY, serializer=PFIELD_NO_SERIALIZER):
98 """
99 Field specification factory for :py:class:`PRecord`.
101 :param type: a type or iterable with types that are allowed for this field
102 :param invariant: a function specifying an invariant that must hold for the field
103 :param initial: value of field if not specified when instantiating the record
104 :param mandatory: boolean specifying if the field is mandatory or not
105 :param factory: function called when field is set.
106 :param serializer: function that returns a serialized version of the field
107 """
109 # NB: We have to check this predicate separately from the predicates in
110 # `maybe_parse_user_type` et al. because this one is related to supporting
111 # the argspec for `field`, while those are related to supporting the valid
112 # ways to specify types.
114 # Multiple types must be passed in one of the following containers. Note
115 # that a type that is a subclass of one of these containers, like a
116 # `collections.namedtuple`, will work as expected, since we check
117 # `isinstance` and not `issubclass`.
118 if isinstance(type, (list, set, tuple)):
119 types = set(maybe_parse_many_user_types(type))
120 else:
121 types = set(maybe_parse_user_type(type))
123 invariant_function = wrap_invariant(invariant) if invariant != PFIELD_NO_INVARIANT and callable(invariant) else invariant
124 field = _PField(type=types, invariant=invariant_function, initial=initial,
125 mandatory=mandatory, factory=factory, serializer=serializer)
127 _check_field_parameters(field)
129 return field
132def _check_field_parameters(field):
133 for t in field.type:
134 if not isinstance(t, type) and not isinstance(t, str):
135 raise TypeError('Type parameter expected, not {0}'.format(type(t)))
137 if field.initial is not PFIELD_NO_INITIAL and \
138 not callable(field.initial) and \
139 field.type and not any(isinstance(field.initial, t) for t in field.type):
140 raise TypeError('Initial has invalid type {0}'.format(type(field.initial)))
142 if not callable(field.invariant):
143 raise TypeError('Invariant must be callable')
145 if not callable(field.factory):
146 raise TypeError('Factory must be callable')
148 if not callable(field.serializer):
149 raise TypeError('Serializer must be callable')
152class PTypeError(TypeError):
153 """
154 Raised when trying to assign a value with a type that doesn't match the declared type.
156 Attributes:
157 source_class -- The class of the record
158 field -- Field name
159 expected_types -- Types allowed for the field
160 actual_type -- The non matching type
161 """
162 def __init__(self, source_class, field, expected_types, actual_type, *args, **kwargs):
163 super(PTypeError, self).__init__(*args, **kwargs)
164 self.source_class = source_class
165 self.field = field
166 self.expected_types = expected_types
167 self.actual_type = actual_type
170SEQ_FIELD_TYPE_SUFFIXES = {
171 CheckedPVector: "PVector",
172 CheckedPSet: "PSet",
173}
175# Global dictionary to hold auto-generated field types: used for unpickling
176_seq_field_types = {}
178def _restore_seq_field_pickle(checked_class, item_type, data):
179 """Unpickling function for auto-generated PVec/PSet field types."""
180 type_ = _seq_field_types[checked_class, item_type]
181 return _restore_pickle(type_, data)
183def _types_to_names(types):
184 """Convert a tuple of types to a human-readable string."""
185 return "".join(get_type(typ).__name__.capitalize() for typ in types)
187def _make_seq_field_type(checked_class, item_type, item_invariant):
188 """Create a subclass of the given checked class with the given item type."""
189 type_ = _seq_field_types.get((checked_class, item_type))
190 if type_ is not None:
191 return type_
193 class TheType(checked_class):
194 __type__ = item_type
195 __invariant__ = item_invariant
197 def __reduce__(self):
198 return (_restore_seq_field_pickle,
199 (checked_class, item_type, list(self)))
201 suffix = SEQ_FIELD_TYPE_SUFFIXES[checked_class]
202 TheType.__name__ = _types_to_names(TheType._checked_types) + suffix
203 _seq_field_types[checked_class, item_type] = TheType
204 return TheType
206def _sequence_field(checked_class, item_type, optional, initial,
207 invariant=PFIELD_NO_INVARIANT,
208 item_invariant=PFIELD_NO_INVARIANT):
209 """
210 Create checked field for either ``PSet`` or ``PVector``.
212 :param checked_class: ``CheckedPSet`` or ``CheckedPVector``.
213 :param item_type: The required type for the items in the set.
214 :param optional: If true, ``None`` can be used as a value for
215 this field.
216 :param initial: Initial value to pass to factory.
218 :return: A ``field`` containing a checked class.
219 """
220 TheType = _make_seq_field_type(checked_class, item_type, item_invariant)
222 if optional:
223 def factory(argument, _factory_fields=None, ignore_extra=False):
224 if argument is None:
225 return None
226 else:
227 return TheType.create(argument, _factory_fields=_factory_fields, ignore_extra=ignore_extra)
228 else:
229 factory = TheType.create
231 return field(type=optional_type(TheType) if optional else TheType,
232 factory=factory, mandatory=True,
233 invariant=invariant,
234 initial=factory(initial))
237def pset_field(item_type, optional=False, initial=(),
238 invariant=PFIELD_NO_INVARIANT,
239 item_invariant=PFIELD_NO_INVARIANT):
240 """
241 Create checked ``PSet`` field.
243 :param item_type: The required type for the items in the set.
244 :param optional: If true, ``None`` can be used as a value for
245 this field.
246 :param initial: Initial value to pass to factory if no value is given
247 for the field.
249 :return: A ``field`` containing a ``CheckedPSet`` of the given type.
250 """
251 return _sequence_field(CheckedPSet, item_type, optional, initial,
252 invariant=invariant,
253 item_invariant=item_invariant)
256def pvector_field(item_type, optional=False, initial=(),
257 invariant=PFIELD_NO_INVARIANT,
258 item_invariant=PFIELD_NO_INVARIANT):
259 """
260 Create checked ``PVector`` field.
262 :param item_type: The required type for the items in the vector.
263 :param optional: If true, ``None`` can be used as a value for
264 this field.
265 :param initial: Initial value to pass to factory if no value is given
266 for the field.
268 :return: A ``field`` containing a ``CheckedPVector`` of the given type.
269 """
270 return _sequence_field(CheckedPVector, item_type, optional, initial,
271 invariant=invariant,
272 item_invariant=item_invariant)
275_valid = lambda item: (True, "")
278# Global dictionary to hold auto-generated field types: used for unpickling
279_pmap_field_types = {}
281def _restore_pmap_field_pickle(key_type, value_type, data):
282 """Unpickling function for auto-generated PMap field types."""
283 type_ = _pmap_field_types[key_type, value_type]
284 return _restore_pickle(type_, data)
286def _make_pmap_field_type(key_type, value_type):
287 """Create a subclass of CheckedPMap with the given key and value types."""
288 type_ = _pmap_field_types.get((key_type, value_type))
289 if type_ is not None:
290 return type_
292 class TheMap(CheckedPMap):
293 __key_type__ = key_type
294 __value_type__ = value_type
296 def __reduce__(self):
297 return (_restore_pmap_field_pickle,
298 (self.__key_type__, self.__value_type__, dict(self)))
300 TheMap.__name__ = "{0}To{1}PMap".format(
301 _types_to_names(TheMap._checked_key_types),
302 _types_to_names(TheMap._checked_value_types))
303 _pmap_field_types[key_type, value_type] = TheMap
304 return TheMap
307def pmap_field(key_type, value_type, optional=False, invariant=PFIELD_NO_INVARIANT):
308 """
309 Create a checked ``PMap`` field.
311 :param key: The required type for the keys of the map.
312 :param value: The required type for the values of the map.
313 :param optional: If true, ``None`` can be used as a value for
314 this field.
315 :param invariant: Pass-through to ``field``.
317 :return: A ``field`` containing a ``CheckedPMap``.
318 """
319 TheMap = _make_pmap_field_type(key_type, value_type)
321 if optional:
322 def factory(argument):
323 if argument is None:
324 return None
325 else:
326 return TheMap.create(argument)
327 else:
328 factory = TheMap.create
330 return field(mandatory=True, initial=TheMap(),
331 type=optional_type(TheMap) if optional else TheMap,
332 factory=factory, invariant=invariant)