1"""
2Base class for the internal managers. Both BlockManager and ArrayManager
3inherit from this class.
4"""
5from __future__ import annotations
6
7from typing import (
8 TYPE_CHECKING,
9 Any,
10 Literal,
11 cast,
12 final,
13)
14
15import numpy as np
16
17from pandas._config import (
18 using_copy_on_write,
19 warn_copy_on_write,
20)
21
22from pandas._libs import (
23 algos as libalgos,
24 lib,
25)
26from pandas.errors import AbstractMethodError
27from pandas.util._validators import validate_bool_kwarg
28
29from pandas.core.dtypes.cast import (
30 find_common_type,
31 np_can_hold_element,
32)
33from pandas.core.dtypes.dtypes import (
34 ExtensionDtype,
35 SparseDtype,
36)
37
38from pandas.core.base import PandasObject
39from pandas.core.construction import extract_array
40from pandas.core.indexes.api import (
41 Index,
42 default_index,
43)
44
45if TYPE_CHECKING:
46 from pandas._typing import (
47 ArrayLike,
48 AxisInt,
49 DtypeObj,
50 Self,
51 Shape,
52 )
53
54
55class _AlreadyWarned:
56 def __init__(self):
57 # This class is used on the manager level to the block level to
58 # ensure that we warn only once. The block method can update the
59 # warned_already option without returning a value to keep the
60 # interface consistent. This is only a temporary solution for
61 # CoW warnings.
62 self.warned_already = False
63
64
65class DataManager(PandasObject):
66 # TODO share more methods/attributes
67
68 axes: list[Index]
69
70 @property
71 def items(self) -> Index:
72 raise AbstractMethodError(self)
73
74 @final
75 def __len__(self) -> int:
76 return len(self.items)
77
78 @property
79 def ndim(self) -> int:
80 return len(self.axes)
81
82 @property
83 def shape(self) -> Shape:
84 return tuple(len(ax) for ax in self.axes)
85
86 @final
87 def _validate_set_axis(self, axis: AxisInt, new_labels: Index) -> None:
88 # Caller is responsible for ensuring we have an Index object.
89 old_len = len(self.axes[axis])
90 new_len = len(new_labels)
91
92 if axis == 1 and len(self.items) == 0:
93 # If we are setting the index on a DataFrame with no columns,
94 # it is OK to change the length.
95 pass
96
97 elif new_len != old_len:
98 raise ValueError(
99 f"Length mismatch: Expected axis has {old_len} elements, new "
100 f"values have {new_len} elements"
101 )
102
103 def reindex_indexer(
104 self,
105 new_axis,
106 indexer,
107 axis: AxisInt,
108 fill_value=None,
109 allow_dups: bool = False,
110 copy: bool = True,
111 only_slice: bool = False,
112 ) -> Self:
113 raise AbstractMethodError(self)
114
115 @final
116 def reindex_axis(
117 self,
118 new_index: Index,
119 axis: AxisInt,
120 fill_value=None,
121 only_slice: bool = False,
122 ) -> Self:
123 """
124 Conform data manager to new index.
125 """
126 new_index, indexer = self.axes[axis].reindex(new_index)
127
128 return self.reindex_indexer(
129 new_index,
130 indexer,
131 axis=axis,
132 fill_value=fill_value,
133 copy=False,
134 only_slice=only_slice,
135 )
136
137 def _equal_values(self, other: Self) -> bool:
138 """
139 To be implemented by the subclasses. Only check the column values
140 assuming shape and indexes have already been checked.
141 """
142 raise AbstractMethodError(self)
143
144 @final
145 def equals(self, other: object) -> bool:
146 """
147 Implementation for DataFrame.equals
148 """
149 if not isinstance(other, type(self)):
150 return False
151
152 self_axes, other_axes = self.axes, other.axes
153 if len(self_axes) != len(other_axes):
154 return False
155 if not all(ax1.equals(ax2) for ax1, ax2 in zip(self_axes, other_axes)):
156 return False
157
158 return self._equal_values(other)
159
160 def apply(
161 self,
162 f,
163 align_keys: list[str] | None = None,
164 **kwargs,
165 ) -> Self:
166 raise AbstractMethodError(self)
167
168 def apply_with_block(
169 self,
170 f,
171 align_keys: list[str] | None = None,
172 **kwargs,
173 ) -> Self:
174 raise AbstractMethodError(self)
175
176 @final
177 def isna(self, func) -> Self:
178 return self.apply("apply", func=func)
179
180 @final
181 def fillna(self, value, limit: int | None, inplace: bool, downcast) -> Self:
182 if limit is not None:
183 # Do this validation even if we go through one of the no-op paths
184 limit = libalgos.validate_limit(None, limit=limit)
185
186 return self.apply_with_block(
187 "fillna",
188 value=value,
189 limit=limit,
190 inplace=inplace,
191 downcast=downcast,
192 using_cow=using_copy_on_write(),
193 already_warned=_AlreadyWarned(),
194 )
195
196 @final
197 def where(self, other, cond, align: bool) -> Self:
198 if align:
199 align_keys = ["other", "cond"]
200 else:
201 align_keys = ["cond"]
202 other = extract_array(other, extract_numpy=True)
203
204 return self.apply_with_block(
205 "where",
206 align_keys=align_keys,
207 other=other,
208 cond=cond,
209 using_cow=using_copy_on_write(),
210 )
211
212 @final
213 def putmask(self, mask, new, align: bool = True, warn: bool = True) -> Self:
214 if align:
215 align_keys = ["new", "mask"]
216 else:
217 align_keys = ["mask"]
218 new = extract_array(new, extract_numpy=True)
219
220 already_warned = None
221 if warn_copy_on_write():
222 already_warned = _AlreadyWarned()
223 if not warn:
224 already_warned.warned_already = True
225
226 return self.apply_with_block(
227 "putmask",
228 align_keys=align_keys,
229 mask=mask,
230 new=new,
231 using_cow=using_copy_on_write(),
232 already_warned=already_warned,
233 )
234
235 @final
236 def round(self, decimals: int, using_cow: bool = False) -> Self:
237 return self.apply_with_block(
238 "round",
239 decimals=decimals,
240 using_cow=using_cow,
241 )
242
243 @final
244 def replace(self, to_replace, value, inplace: bool) -> Self:
245 inplace = validate_bool_kwarg(inplace, "inplace")
246 # NDFrame.replace ensures the not-is_list_likes here
247 assert not lib.is_list_like(to_replace)
248 assert not lib.is_list_like(value)
249 return self.apply_with_block(
250 "replace",
251 to_replace=to_replace,
252 value=value,
253 inplace=inplace,
254 using_cow=using_copy_on_write(),
255 already_warned=_AlreadyWarned(),
256 )
257
258 @final
259 def replace_regex(self, **kwargs) -> Self:
260 return self.apply_with_block(
261 "_replace_regex",
262 **kwargs,
263 using_cow=using_copy_on_write(),
264 already_warned=_AlreadyWarned(),
265 )
266
267 @final
268 def replace_list(
269 self,
270 src_list: list[Any],
271 dest_list: list[Any],
272 inplace: bool = False,
273 regex: bool = False,
274 ) -> Self:
275 """do a list replace"""
276 inplace = validate_bool_kwarg(inplace, "inplace")
277
278 bm = self.apply_with_block(
279 "replace_list",
280 src_list=src_list,
281 dest_list=dest_list,
282 inplace=inplace,
283 regex=regex,
284 using_cow=using_copy_on_write(),
285 already_warned=_AlreadyWarned(),
286 )
287 bm._consolidate_inplace()
288 return bm
289
290 def interpolate(self, inplace: bool, **kwargs) -> Self:
291 return self.apply_with_block(
292 "interpolate",
293 inplace=inplace,
294 **kwargs,
295 using_cow=using_copy_on_write(),
296 already_warned=_AlreadyWarned(),
297 )
298
299 def pad_or_backfill(self, inplace: bool, **kwargs) -> Self:
300 return self.apply_with_block(
301 "pad_or_backfill",
302 inplace=inplace,
303 **kwargs,
304 using_cow=using_copy_on_write(),
305 already_warned=_AlreadyWarned(),
306 )
307
308 def shift(self, periods: int, fill_value) -> Self:
309 if fill_value is lib.no_default:
310 fill_value = None
311
312 return self.apply_with_block("shift", periods=periods, fill_value=fill_value)
313
314 # --------------------------------------------------------------------
315 # Consolidation: No-ops for all but BlockManager
316
317 def is_consolidated(self) -> bool:
318 return True
319
320 def consolidate(self) -> Self:
321 return self
322
323 def _consolidate_inplace(self) -> None:
324 return
325
326
327class SingleDataManager(DataManager):
328 @property
329 def ndim(self) -> Literal[1]:
330 return 1
331
332 @final
333 @property
334 def array(self) -> ArrayLike:
335 """
336 Quick access to the backing array of the Block or SingleArrayManager.
337 """
338 # error: "SingleDataManager" has no attribute "arrays"; maybe "array"
339 return self.arrays[0] # type: ignore[attr-defined]
340
341 def setitem_inplace(self, indexer, value, warn: bool = True) -> None:
342 """
343 Set values with indexer.
344
345 For Single[Block/Array]Manager, this backs s[indexer] = value
346
347 This is an inplace version of `setitem()`, mutating the manager/values
348 in place, not returning a new Manager (and Block), and thus never changing
349 the dtype.
350 """
351 arr = self.array
352
353 # EAs will do this validation in their own __setitem__ methods.
354 if isinstance(arr, np.ndarray):
355 # Note: checking for ndarray instead of np.dtype means we exclude
356 # dt64/td64, which do their own validation.
357 value = np_can_hold_element(arr.dtype, value)
358
359 if isinstance(value, np.ndarray) and value.ndim == 1 and len(value) == 1:
360 # NumPy 1.25 deprecation: https://github.com/numpy/numpy/pull/10615
361 value = value[0, ...]
362
363 arr[indexer] = value
364
365 def grouped_reduce(self, func):
366 arr = self.array
367 res = func(arr)
368 index = default_index(len(res))
369
370 mgr = type(self).from_array(res, index)
371 return mgr
372
373 @classmethod
374 def from_array(cls, arr: ArrayLike, index: Index):
375 raise AbstractMethodError(cls)
376
377
378def interleaved_dtype(dtypes: list[DtypeObj]) -> DtypeObj | None:
379 """
380 Find the common dtype for `blocks`.
381
382 Parameters
383 ----------
384 blocks : List[DtypeObj]
385
386 Returns
387 -------
388 dtype : np.dtype, ExtensionDtype, or None
389 None is returned when `blocks` is empty.
390 """
391 if not len(dtypes):
392 return None
393
394 return find_common_type(dtypes)
395
396
397def ensure_np_dtype(dtype: DtypeObj) -> np.dtype:
398 # TODO: https://github.com/pandas-dev/pandas/issues/22791
399 # Give EAs some input on what happens here. Sparse needs this.
400 if isinstance(dtype, SparseDtype):
401 dtype = dtype.subtype
402 dtype = cast(np.dtype, dtype)
403 elif isinstance(dtype, ExtensionDtype):
404 dtype = np.dtype("object")
405 elif dtype == np.dtype(str):
406 dtype = np.dtype("object")
407 return dtype