Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/core/serializers/base.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

181 statements  

1""" 

2Module for abstract serializer/unserializer base classes. 

3""" 

4 

5from io import StringIO 

6 

7from django.core.exceptions import ObjectDoesNotExist 

8from django.db import models 

9 

10DEFER_FIELD = object() 

11 

12 

13class SerializerDoesNotExist(KeyError): 

14 """The requested serializer was not found.""" 

15 

16 pass 

17 

18 

19class SerializationError(Exception): 

20 """Something bad happened during serialization.""" 

21 

22 pass 

23 

24 

25class DeserializationError(Exception): 

26 """Something bad happened during deserialization.""" 

27 

28 @classmethod 

29 def WithData(cls, original_exc, model, fk, field_value): 

30 """ 

31 Factory method for creating a deserialization error which has a more 

32 explanatory message. 

33 """ 

34 return cls( 

35 "%s: (%s:pk=%s) field_value was '%s'" 

36 % (original_exc, model, fk, field_value) 

37 ) 

38 

39 

40class M2MDeserializationError(Exception): 

41 """Something bad happened during deserialization of a ManyToManyField.""" 

42 

43 def __init__(self, original_exc, pk): 

44 self.original_exc = original_exc 

45 self.pk = pk 

46 

47 

48class ProgressBar: 

49 progress_width = 75 

50 

51 def __init__(self, output, total_count): 

52 self.output = output 

53 self.total_count = total_count 

54 self.prev_done = 0 

55 

56 def update(self, count): 

57 if not self.output: 

58 return 

59 perc = count * 100 // self.total_count 

60 done = perc * self.progress_width // 100 

61 if self.prev_done >= done: 

62 return 

63 self.prev_done = done 

64 cr = "" if self.total_count == 1 else "\r" 

65 self.output.write( 

66 cr + "[" + "." * done + " " * (self.progress_width - done) + "]" 

67 ) 

68 if done == self.progress_width: 

69 self.output.write("\n") 

70 self.output.flush() 

71 

72 

73class Serializer: 

74 """ 

75 Abstract serializer base class. 

76 """ 

77 

78 # Indicates if the implemented serializer is only available for 

79 # internal Django use. 

80 internal_use_only = False 

81 progress_class = ProgressBar 

82 stream_class = StringIO 

83 

84 def serialize( 

85 self, 

86 queryset, 

87 *, 

88 stream=None, 

89 fields=None, 

90 use_natural_foreign_keys=False, 

91 use_natural_primary_keys=False, 

92 progress_output=None, 

93 object_count=0, 

94 **options, 

95 ): 

96 """ 

97 Serialize a queryset. 

98 """ 

99 self.options = options 

100 

101 self.stream = stream if stream is not None else self.stream_class() 

102 self.selected_fields = fields 

103 self.use_natural_foreign_keys = use_natural_foreign_keys 

104 self.use_natural_primary_keys = use_natural_primary_keys 

105 progress_bar = self.progress_class(progress_output, object_count) 

106 

107 self.start_serialization() 

108 self.first = True 

109 for count, obj in enumerate(queryset, start=1): 

110 self.start_object(obj) 

111 # Use the concrete parent class' _meta instead of the object's _meta 

112 # This is to avoid local_fields problems for proxy models. Refs #17717. 

113 concrete_model = obj._meta.concrete_model 

114 # When using natural primary keys, retrieve the pk field of the 

115 # parent for multi-table inheritance child models. That field must 

116 # be serialized, otherwise deserialization isn't possible. 

117 if self.use_natural_primary_keys: 

118 pk = concrete_model._meta.pk 

119 pk_parent = ( 

120 pk if pk.remote_field and pk.remote_field.parent_link else None 

121 ) 

122 else: 

123 pk_parent = None 

124 for field in concrete_model._meta.local_fields: 

125 if field.serialize or field is pk_parent: 

126 if field.remote_field is None: 

127 if ( 

128 self.selected_fields is None 

129 or field.attname in self.selected_fields 

130 ): 

131 self.handle_field(obj, field) 

132 else: 

133 if ( 

134 self.selected_fields is None 

135 or field.attname[:-3] in self.selected_fields 

136 ): 

137 self.handle_fk_field(obj, field) 

138 for field in concrete_model._meta.local_many_to_many: 

139 if field.serialize: 

140 if ( 

141 self.selected_fields is None 

142 or field.attname in self.selected_fields 

143 ): 

144 self.handle_m2m_field(obj, field) 

145 self.end_object(obj) 

146 progress_bar.update(count) 

147 self.first = self.first and False 

148 self.end_serialization() 

149 return self.getvalue() 

150 

151 def start_serialization(self): 

152 """ 

153 Called when serializing of the queryset starts. 

154 """ 

155 raise NotImplementedError( 

156 "subclasses of Serializer must provide a start_serialization() method" 

157 ) 

158 

159 def end_serialization(self): 

160 """ 

161 Called when serializing of the queryset ends. 

162 """ 

163 pass 

164 

165 def start_object(self, obj): 

166 """ 

167 Called when serializing of an object starts. 

168 """ 

169 raise NotImplementedError( 

170 "subclasses of Serializer must provide a start_object() method" 

171 ) 

172 

173 def end_object(self, obj): 

174 """ 

175 Called when serializing of an object ends. 

176 """ 

177 pass 

178 

179 def handle_field(self, obj, field): 

180 """ 

181 Called to handle each individual (non-relational) field on an object. 

182 """ 

183 raise NotImplementedError( 

184 "subclasses of Serializer must provide a handle_field() method" 

185 ) 

186 

187 def handle_fk_field(self, obj, field): 

188 """ 

189 Called to handle a ForeignKey field. 

190 """ 

191 raise NotImplementedError( 

192 "subclasses of Serializer must provide a handle_fk_field() method" 

193 ) 

194 

195 def handle_m2m_field(self, obj, field): 

196 """ 

197 Called to handle a ManyToManyField. 

198 """ 

199 raise NotImplementedError( 

200 "subclasses of Serializer must provide a handle_m2m_field() method" 

201 ) 

202 

203 def getvalue(self): 

204 """ 

205 Return the fully serialized queryset (or None if the output stream is 

206 not seekable). 

207 """ 

208 if callable(getattr(self.stream, "getvalue", None)): 

209 return self.stream.getvalue() 

210 

211 

212class Deserializer: 

213 """ 

214 Abstract base deserializer class. 

215 """ 

216 

217 def __init__(self, stream_or_string, **options): 

218 """ 

219 Init this serializer given a stream or a string 

220 """ 

221 self.options = options 

222 if isinstance(stream_or_string, str): 

223 self.stream = StringIO(stream_or_string) 

224 else: 

225 self.stream = stream_or_string 

226 

227 def __iter__(self): 

228 return self 

229 

230 def __next__(self): 

231 """Iteration interface -- return the next item in the stream""" 

232 raise NotImplementedError( 

233 "subclasses of Deserializer must provide a __next__() method" 

234 ) 

235 

236 

237class DeserializedObject: 

238 """ 

239 A deserialized model. 

240 

241 Basically a container for holding the pre-saved deserialized data along 

242 with the many-to-many data saved with the object. 

243 

244 Call ``save()`` to save the object (with the many-to-many data) to the 

245 database; call ``save(save_m2m=False)`` to save just the object fields 

246 (and not touch the many-to-many stuff.) 

247 """ 

248 

249 def __init__(self, obj, m2m_data=None, deferred_fields=None): 

250 self.object = obj 

251 self.m2m_data = m2m_data 

252 self.deferred_fields = deferred_fields 

253 

254 def __repr__(self): 

255 return "<%s: %s(pk=%s)>" % ( 

256 self.__class__.__name__, 

257 self.object._meta.label, 

258 self.object.pk, 

259 ) 

260 

261 def save(self, save_m2m=True, using=None, **kwargs): 

262 # Call save on the Model baseclass directly. This bypasses any 

263 # model-defined save. The save is also forced to be raw. 

264 # raw=True is passed to any pre/post_save signals. 

265 models.Model.save_base(self.object, using=using, raw=True, **kwargs) 

266 if self.m2m_data and save_m2m: 

267 for accessor_name, object_list in self.m2m_data.items(): 

268 getattr(self.object, accessor_name).set(object_list) 

269 

270 # prevent a second (possibly accidental) call to save() from saving 

271 # the m2m data twice. 

272 self.m2m_data = None 

273 

274 def save_deferred_fields(self, using=None): 

275 self.m2m_data = {} 

276 for field, field_value in self.deferred_fields.items(): 

277 opts = self.object._meta 

278 label = opts.app_label + "." + opts.model_name 

279 if isinstance(field.remote_field, models.ManyToManyRel): 

280 try: 

281 values = deserialize_m2m_values( 

282 field, field_value, using, handle_forward_references=False 

283 ) 

284 except M2MDeserializationError as e: 

285 raise DeserializationError.WithData( 

286 e.original_exc, label, self.object.pk, e.pk 

287 ) 

288 self.m2m_data[field.name] = values 

289 elif isinstance(field.remote_field, models.ManyToOneRel): 

290 try: 

291 value = deserialize_fk_value( 

292 field, field_value, using, handle_forward_references=False 

293 ) 

294 except Exception as e: 

295 raise DeserializationError.WithData( 

296 e, label, self.object.pk, field_value 

297 ) 

298 setattr(self.object, field.attname, value) 

299 self.save() 

300 

301 

302def build_instance(Model, data, db): 

303 """ 

304 Build a model instance. 

305 

306 If the model instance doesn't have a primary key and the model supports 

307 natural keys, try to retrieve it from the database. 

308 """ 

309 default_manager = Model._meta.default_manager 

310 pk = data.get(Model._meta.pk.attname) 

311 if ( 

312 pk is None 

313 and hasattr(default_manager, "get_by_natural_key") 

314 and hasattr(Model, "natural_key") 

315 ): 

316 obj = Model(**data) 

317 obj._state.db = db 

318 natural_key = obj.natural_key() 

319 try: 

320 data[Model._meta.pk.attname] = Model._meta.pk.to_python( 

321 default_manager.db_manager(db).get_by_natural_key(*natural_key).pk 

322 ) 

323 except Model.DoesNotExist: 

324 pass 

325 return Model(**data) 

326 

327 

328def deserialize_m2m_values(field, field_value, using, handle_forward_references): 

329 model = field.remote_field.model 

330 if hasattr(model._default_manager, "get_by_natural_key"): 

331 

332 def m2m_convert(value): 

333 if hasattr(value, "__iter__") and not isinstance(value, str): 

334 return ( 

335 model._default_manager.db_manager(using) 

336 .get_by_natural_key(*value) 

337 .pk 

338 ) 

339 else: 

340 return model._meta.pk.to_python(value) 

341 

342 else: 

343 

344 def m2m_convert(v): 

345 return model._meta.pk.to_python(v) 

346 

347 try: 

348 pks_iter = iter(field_value) 

349 except TypeError as e: 

350 raise M2MDeserializationError(e, field_value) 

351 try: 

352 values = [] 

353 for pk in pks_iter: 

354 values.append(m2m_convert(pk)) 

355 return values 

356 except Exception as e: 

357 if isinstance(e, ObjectDoesNotExist) and handle_forward_references: 

358 return DEFER_FIELD 

359 else: 

360 raise M2MDeserializationError(e, pk) 

361 

362 

363def deserialize_fk_value(field, field_value, using, handle_forward_references): 

364 if field_value is None: 

365 return None 

366 model = field.remote_field.model 

367 default_manager = model._default_manager 

368 field_name = field.remote_field.field_name 

369 if ( 

370 hasattr(default_manager, "get_by_natural_key") 

371 and hasattr(field_value, "__iter__") 

372 and not isinstance(field_value, str) 

373 ): 

374 try: 

375 obj = default_manager.db_manager(using).get_by_natural_key(*field_value) 

376 except ObjectDoesNotExist: 

377 if handle_forward_references: 

378 return DEFER_FIELD 

379 else: 

380 raise 

381 value = getattr(obj, field_name) 

382 # If this is a natural foreign key to an object that has a FK/O2O as 

383 # the foreign key, use the FK value. 

384 if model._meta.pk.remote_field: 

385 value = value.pk 

386 return value 

387 return model._meta.get_field(field_name).to_python(field_value)