Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyrsistent/_precord.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from typing import Any
2from pyrsistent._checked_types import CheckedType, _restore_pickle, InvariantException, store_invariants
3from pyrsistent._field_common import (
4 set_fields, check_type, is_field_ignore_extra_complaint, PFIELD_NO_INITIAL, serialize, check_global_invariants
5)
6from pyrsistent._pmap import PMap, pmap
9class _PRecordMeta(type):
10 def __new__(mcs, name, bases, dct):
11 set_fields(dct, bases, name='_precord_fields')
12 store_invariants(dct, bases, '_precord_invariants', '__invariant__')
13 dct['_precord_mandatory_fields'] = \
14 set(name for name, field in dct['_precord_fields'].items() if field.mandatory)
15 dct['_precord_initial_values'] = \
16 dict((k, field.initial) for k, field in dct['_precord_fields'].items() if field.initial is not PFIELD_NO_INITIAL)
17 dct['__slots__'] = ()
18 return super(_PRecordMeta, mcs).__new__(mcs, name, bases, dct)
21class PRecord(PMap[str, Any], CheckedType, metaclass=_PRecordMeta):
22 """
23 A PRecord is a PMap with a fixed set of specified fields. Records are declared as python classes inheriting
24 from PRecord. Because it is a PMap it has full support for all Mapping methods such as iteration and element
25 access using subscript notation.
27 More documentation and examples of PRecord usage is available at https://github.com/tobgu/pyrsistent
28 """
29 def __new__(cls, **kwargs):
30 # Hack total! If these two special attributes exist that means we can create
31 # ourselves. Otherwise we need to go through the Evolver to create the structures
32 # for us.
33 if '_precord_size' in kwargs and '_precord_buckets' in kwargs:
34 return super(PRecord, cls).__new__(cls, kwargs['_precord_size'], kwargs['_precord_buckets'])
36 factory_fields = kwargs.pop('_factory_fields', None)
37 ignore_extra = kwargs.pop('_ignore_extra', False)
38 initial_values = kwargs
39 if cls._precord_initial_values:
40 initial_values = dict((k, v() if callable(v) else v)
41 for k, v in cls._precord_initial_values.items())
42 initial_values.update(kwargs)
44 e = _PRecordEvolver(cls, pmap(pre_size=len(cls._precord_fields)), _factory_fields=factory_fields, _ignore_extra=ignore_extra)
45 for k, v in initial_values.items():
46 e[k] = v
48 return e.persistent()
50 def set(self, *args, **kwargs):
51 """
52 Set a field in the record. This set function differs slightly from that in the PMap
53 class. First of all it accepts key-value pairs. Second it accepts multiple key-value
54 pairs to perform one, atomic, update of multiple fields.
55 """
56 # The PRecord set() can accept kwargs since all fields that have been declared are
57 # valid python identifiers. Also allow multiple fields to be set in one operation.
58 if args:
59 return super(PRecord, self).set(args[0], args[1])
61 return self.update(kwargs)
63 def evolver(self):
64 """
65 Returns an evolver of this object.
66 """
67 return _PRecordEvolver(self.__class__, self)
69 def __repr__(self):
70 return "{0}({1})".format(self.__class__.__name__,
71 ', '.join('{0}={1}'.format(k, repr(v)) for k, v in self.items()))
73 @classmethod
74 def create(cls, kwargs, _factory_fields=None, ignore_extra=False):
75 """
76 Factory method. Will create a new PRecord of the current type and assign the values
77 specified in kwargs.
79 :param ignore_extra: A boolean which when set to True will ignore any keys which appear in kwargs that are not
80 in the set of fields on the PRecord.
81 """
82 if isinstance(kwargs, cls):
83 return kwargs
85 if ignore_extra:
86 kwargs = {k: kwargs[k] for k in cls._precord_fields if k in kwargs}
88 return cls(_factory_fields=_factory_fields, _ignore_extra=ignore_extra, **kwargs)
90 def __reduce__(self):
91 # Pickling support
92 return _restore_pickle, (self.__class__, dict(self),)
94 def serialize(self, format=None):
95 """
96 Serialize the current PRecord using custom serializer functions for fields where
97 such have been supplied.
98 """
99 return dict((k, serialize(self._precord_fields[k].serializer, format, v)) for k, v in self.items())
102class _PRecordEvolver(PMap._Evolver):
103 __slots__ = ('_destination_cls', '_invariant_error_codes', '_missing_fields', '_factory_fields', '_ignore_extra')
105 def __init__(self, cls, original_pmap, _factory_fields=None, _ignore_extra=False):
106 super(_PRecordEvolver, self).__init__(original_pmap)
107 self._destination_cls = cls
108 self._invariant_error_codes = []
109 self._missing_fields = []
110 self._factory_fields = _factory_fields
111 self._ignore_extra = _ignore_extra
113 def __setitem__(self, key, original_value):
114 self.set(key, original_value)
116 def set(self, key, original_value):
117 field = self._destination_cls._precord_fields.get(key)
118 if field:
119 if self._factory_fields is None or field in self._factory_fields:
120 try:
121 if is_field_ignore_extra_complaint(CheckedType, field, self._ignore_extra):
122 value = field.factory(original_value, ignore_extra=self._ignore_extra)
123 else:
124 value = field.factory(original_value)
125 except InvariantException as e:
126 self._invariant_error_codes += e.invariant_errors
127 self._missing_fields += e.missing_fields
128 return self
129 else:
130 value = original_value
132 check_type(self._destination_cls, field, key, value)
133 is_ok, error_code = field.invariant(value)
134 if not is_ok:
135 self._invariant_error_codes.append(error_code)
137 return super(_PRecordEvolver, self).set(key, value)
138 else:
139 raise AttributeError("'{0}' is not among the specified fields for {1}".format(key, self._destination_cls.__name__))
141 def persistent(self):
142 cls = self._destination_cls
143 is_dirty = self.is_dirty()
144 pm = super(_PRecordEvolver, self).persistent()
146 if is_dirty or not isinstance(pm, cls):
147 result = cls(_precord_buckets=pm._buckets, _precord_size=pm._size)
148 else:
149 result = pm
151 if cls._precord_mandatory_fields:
152 self._missing_fields += tuple('{0}.{1}'.format(cls.__name__, f) for f
153 in (cls._precord_mandatory_fields - set(result.keys())))
155 if self._invariant_error_codes or self._missing_fields:
156 raise InvariantException(tuple(self._invariant_error_codes), tuple(self._missing_fields),
157 'Field invariant failed')
159 check_global_invariants(result, cls._precord_invariants)
161 return result