Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pandas/io/sql.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Collection of query wrappers / abstractions to both facilitate data
3retrieval and to reduce dependency on DB-specific API.
4"""
6from __future__ import annotations
8from abc import (
9 ABC,
10 abstractmethod,
11)
12from contextlib import (
13 ExitStack,
14 contextmanager,
15)
16from datetime import (
17 date,
18 datetime,
19 time,
20)
21from functools import partial
22import re
23from typing import (
24 TYPE_CHECKING,
25 Any,
26 Callable,
27 Literal,
28 cast,
29 overload,
30)
31import warnings
33import numpy as np
35from pandas._config import using_pyarrow_string_dtype
37from pandas._libs import lib
38from pandas.compat._optional import import_optional_dependency
39from pandas.errors import (
40 AbstractMethodError,
41 DatabaseError,
42)
43from pandas.util._exceptions import find_stack_level
44from pandas.util._validators import check_dtype_backend
46from pandas.core.dtypes.common import (
47 is_dict_like,
48 is_list_like,
49)
50from pandas.core.dtypes.dtypes import (
51 ArrowDtype,
52 DatetimeTZDtype,
53)
54from pandas.core.dtypes.missing import isna
56from pandas import get_option
57from pandas.core.api import (
58 DataFrame,
59 Series,
60)
61from pandas.core.arrays import ArrowExtensionArray
62from pandas.core.base import PandasObject
63import pandas.core.common as com
64from pandas.core.common import maybe_make_list
65from pandas.core.internals.construction import convert_object_array
66from pandas.core.tools.datetimes import to_datetime
68if TYPE_CHECKING:
69 from collections.abc import (
70 Iterator,
71 Mapping,
72 )
74 from sqlalchemy import Table
75 from sqlalchemy.sql.expression import (
76 Select,
77 TextClause,
78 )
80 from pandas._typing import (
81 DateTimeErrorChoices,
82 DtypeArg,
83 DtypeBackend,
84 IndexLabel,
85 Self,
86 )
88 from pandas import Index
90# -----------------------------------------------------------------------------
91# -- Helper functions
94def _process_parse_dates_argument(parse_dates):
95 """Process parse_dates argument for read_sql functions"""
96 # handle non-list entries for parse_dates gracefully
97 if parse_dates is True or parse_dates is None or parse_dates is False:
98 parse_dates = []
100 elif not hasattr(parse_dates, "__iter__"):
101 parse_dates = [parse_dates]
102 return parse_dates
105def _handle_date_column(
106 col, utc: bool = False, format: str | dict[str, Any] | None = None
107):
108 if isinstance(format, dict):
109 # GH35185 Allow custom error values in parse_dates argument of
110 # read_sql like functions.
111 # Format can take on custom to_datetime argument values such as
112 # {"errors": "coerce"} or {"dayfirst": True}
113 error: DateTimeErrorChoices = format.pop("errors", None) or "ignore"
114 if error == "ignore":
115 try:
116 return to_datetime(col, **format)
117 except (TypeError, ValueError):
118 # TODO: not reached 2023-10-27; needed?
119 return col
120 return to_datetime(col, errors=error, **format)
121 else:
122 # Allow passing of formatting string for integers
123 # GH17855
124 if format is None and (
125 issubclass(col.dtype.type, np.floating)
126 or issubclass(col.dtype.type, np.integer)
127 ):
128 format = "s"
129 if format in ["D", "d", "h", "m", "s", "ms", "us", "ns"]:
130 return to_datetime(col, errors="coerce", unit=format, utc=utc)
131 elif isinstance(col.dtype, DatetimeTZDtype):
132 # coerce to UTC timezone
133 # GH11216
134 return to_datetime(col, utc=True)
135 else:
136 return to_datetime(col, errors="coerce", format=format, utc=utc)
139def _parse_date_columns(data_frame, parse_dates):
140 """
141 Force non-datetime columns to be read as such.
142 Supports both string formatted and integer timestamp columns.
143 """
144 parse_dates = _process_parse_dates_argument(parse_dates)
146 # we want to coerce datetime64_tz dtypes for now to UTC
147 # we could in theory do a 'nice' conversion from a FixedOffset tz
148 # GH11216
149 for i, (col_name, df_col) in enumerate(data_frame.items()):
150 if isinstance(df_col.dtype, DatetimeTZDtype) or col_name in parse_dates:
151 try:
152 fmt = parse_dates[col_name]
153 except (KeyError, TypeError):
154 fmt = None
155 data_frame.isetitem(i, _handle_date_column(df_col, format=fmt))
157 return data_frame
160def _convert_arrays_to_dataframe(
161 data,
162 columns,
163 coerce_float: bool = True,
164 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
165) -> DataFrame:
166 content = lib.to_object_array_tuples(data)
167 arrays = convert_object_array(
168 list(content.T),
169 dtype=None,
170 coerce_float=coerce_float,
171 dtype_backend=dtype_backend,
172 )
173 if dtype_backend == "pyarrow":
174 pa = import_optional_dependency("pyarrow")
176 result_arrays = []
177 for arr in arrays:
178 pa_array = pa.array(arr, from_pandas=True)
179 if arr.dtype == "string":
180 # TODO: Arrow still infers strings arrays as regular strings instead
181 # of large_string, which is what we preserver everywhere else for
182 # dtype_backend="pyarrow". We may want to reconsider this
183 pa_array = pa_array.cast(pa.string())
184 result_arrays.append(ArrowExtensionArray(pa_array))
185 arrays = result_arrays # type: ignore[assignment]
186 if arrays:
187 df = DataFrame(dict(zip(list(range(len(columns))), arrays)))
188 df.columns = columns
189 return df
190 else:
191 return DataFrame(columns=columns)
194def _wrap_result(
195 data,
196 columns,
197 index_col=None,
198 coerce_float: bool = True,
199 parse_dates=None,
200 dtype: DtypeArg | None = None,
201 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
202):
203 """Wrap result set of a SQLAlchemy query in a DataFrame."""
204 frame = _convert_arrays_to_dataframe(data, columns, coerce_float, dtype_backend)
206 if dtype:
207 frame = frame.astype(dtype)
209 frame = _parse_date_columns(frame, parse_dates)
211 if index_col is not None:
212 frame = frame.set_index(index_col)
214 return frame
217def _wrap_result_adbc(
218 df: DataFrame,
219 *,
220 index_col=None,
221 parse_dates=None,
222 dtype: DtypeArg | None = None,
223 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
224) -> DataFrame:
225 """Wrap result set of a SQLAlchemy query in a DataFrame."""
226 if dtype:
227 df = df.astype(dtype)
229 df = _parse_date_columns(df, parse_dates)
231 if index_col is not None:
232 df = df.set_index(index_col)
234 return df
237def execute(sql, con, params=None):
238 """
239 Execute the given SQL query using the provided connection object.
241 Parameters
242 ----------
243 sql : string
244 SQL query to be executed.
245 con : SQLAlchemy connection or sqlite3 connection
246 If a DBAPI2 object, only sqlite3 is supported.
247 params : list or tuple, optional, default: None
248 List of parameters to pass to execute method.
250 Returns
251 -------
252 Results Iterable
253 """
254 warnings.warn(
255 "`pandas.io.sql.execute` is deprecated and "
256 "will be removed in the future version.",
257 FutureWarning,
258 stacklevel=find_stack_level(),
259 ) # GH50185
260 sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore")
262 if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Engine)):
263 raise TypeError("pandas.io.sql.execute requires a connection") # GH50185
264 with pandasSQL_builder(con, need_transaction=True) as pandas_sql:
265 return pandas_sql.execute(sql, params)
268# -----------------------------------------------------------------------------
269# -- Read and write to DataFrames
272@overload
273def read_sql_table(
274 table_name: str,
275 con,
276 schema=...,
277 index_col: str | list[str] | None = ...,
278 coerce_float=...,
279 parse_dates: list[str] | dict[str, str] | None = ...,
280 columns: list[str] | None = ...,
281 chunksize: None = ...,
282 dtype_backend: DtypeBackend | lib.NoDefault = ...,
283) -> DataFrame:
284 ...
287@overload
288def read_sql_table(
289 table_name: str,
290 con,
291 schema=...,
292 index_col: str | list[str] | None = ...,
293 coerce_float=...,
294 parse_dates: list[str] | dict[str, str] | None = ...,
295 columns: list[str] | None = ...,
296 chunksize: int = ...,
297 dtype_backend: DtypeBackend | lib.NoDefault = ...,
298) -> Iterator[DataFrame]:
299 ...
302def read_sql_table(
303 table_name: str,
304 con,
305 schema: str | None = None,
306 index_col: str | list[str] | None = None,
307 coerce_float: bool = True,
308 parse_dates: list[str] | dict[str, str] | None = None,
309 columns: list[str] | None = None,
310 chunksize: int | None = None,
311 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
312) -> DataFrame | Iterator[DataFrame]:
313 """
314 Read SQL database table into a DataFrame.
316 Given a table name and a SQLAlchemy connectable, returns a DataFrame.
317 This function does not support DBAPI connections.
319 Parameters
320 ----------
321 table_name : str
322 Name of SQL table in database.
323 con : SQLAlchemy connectable or str
324 A database URI could be provided as str.
325 SQLite DBAPI connection mode not supported.
326 schema : str, default None
327 Name of SQL schema in database to query (if database flavor
328 supports this). Uses default schema if None (default).
329 index_col : str or list of str, optional, default: None
330 Column(s) to set as index(MultiIndex).
331 coerce_float : bool, default True
332 Attempts to convert values of non-string, non-numeric objects (like
333 decimal.Decimal) to floating point. Can result in loss of Precision.
334 parse_dates : list or dict, default None
335 - List of column names to parse as dates.
336 - Dict of ``{column_name: format string}`` where format string is
337 strftime compatible in case of parsing string times or is one of
338 (D, s, ns, ms, us) in case of parsing integer timestamps.
339 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
340 to the keyword arguments of :func:`pandas.to_datetime`
341 Especially useful with databases without native Datetime support,
342 such as SQLite.
343 columns : list, default None
344 List of column names to select from SQL table.
345 chunksize : int, default None
346 If specified, returns an iterator where `chunksize` is the number of
347 rows to include in each chunk.
348 dtype_backend : {'numpy_nullable', 'pyarrow'}, default 'numpy_nullable'
349 Back-end data type applied to the resultant :class:`DataFrame`
350 (still experimental). Behaviour is as follows:
352 * ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame`
353 (default).
354 * ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype`
355 DataFrame.
357 .. versionadded:: 2.0
359 Returns
360 -------
361 DataFrame or Iterator[DataFrame]
362 A SQL table is returned as two-dimensional data structure with labeled
363 axes.
365 See Also
366 --------
367 read_sql_query : Read SQL query into a DataFrame.
368 read_sql : Read SQL query or database table into a DataFrame.
370 Notes
371 -----
372 Any datetime values with time zone information will be converted to UTC.
374 Examples
375 --------
376 >>> pd.read_sql_table('table_name', 'postgres:///db_name') # doctest:+SKIP
377 """
379 check_dtype_backend(dtype_backend)
380 if dtype_backend is lib.no_default:
381 dtype_backend = "numpy" # type: ignore[assignment]
382 assert dtype_backend is not lib.no_default
384 with pandasSQL_builder(con, schema=schema, need_transaction=True) as pandas_sql:
385 if not pandas_sql.has_table(table_name):
386 raise ValueError(f"Table {table_name} not found")
388 table = pandas_sql.read_table(
389 table_name,
390 index_col=index_col,
391 coerce_float=coerce_float,
392 parse_dates=parse_dates,
393 columns=columns,
394 chunksize=chunksize,
395 dtype_backend=dtype_backend,
396 )
398 if table is not None:
399 return table
400 else:
401 raise ValueError(f"Table {table_name} not found", con)
404@overload
405def read_sql_query(
406 sql,
407 con,
408 index_col: str | list[str] | None = ...,
409 coerce_float=...,
410 params: list[Any] | Mapping[str, Any] | None = ...,
411 parse_dates: list[str] | dict[str, str] | None = ...,
412 chunksize: None = ...,
413 dtype: DtypeArg | None = ...,
414 dtype_backend: DtypeBackend | lib.NoDefault = ...,
415) -> DataFrame:
416 ...
419@overload
420def read_sql_query(
421 sql,
422 con,
423 index_col: str | list[str] | None = ...,
424 coerce_float=...,
425 params: list[Any] | Mapping[str, Any] | None = ...,
426 parse_dates: list[str] | dict[str, str] | None = ...,
427 chunksize: int = ...,
428 dtype: DtypeArg | None = ...,
429 dtype_backend: DtypeBackend | lib.NoDefault = ...,
430) -> Iterator[DataFrame]:
431 ...
434def read_sql_query(
435 sql,
436 con,
437 index_col: str | list[str] | None = None,
438 coerce_float: bool = True,
439 params: list[Any] | Mapping[str, Any] | None = None,
440 parse_dates: list[str] | dict[str, str] | None = None,
441 chunksize: int | None = None,
442 dtype: DtypeArg | None = None,
443 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
444) -> DataFrame | Iterator[DataFrame]:
445 """
446 Read SQL query into a DataFrame.
448 Returns a DataFrame corresponding to the result set of the query
449 string. Optionally provide an `index_col` parameter to use one of the
450 columns as the index, otherwise default integer index will be used.
452 Parameters
453 ----------
454 sql : str SQL query or SQLAlchemy Selectable (select or text object)
455 SQL query to be executed.
456 con : SQLAlchemy connectable, str, or sqlite3 connection
457 Using SQLAlchemy makes it possible to use any DB supported by that
458 library. If a DBAPI2 object, only sqlite3 is supported.
459 index_col : str or list of str, optional, default: None
460 Column(s) to set as index(MultiIndex).
461 coerce_float : bool, default True
462 Attempts to convert values of non-string, non-numeric objects (like
463 decimal.Decimal) to floating point. Useful for SQL result sets.
464 params : list, tuple or mapping, optional, default: None
465 List of parameters to pass to execute method. The syntax used
466 to pass parameters is database driver dependent. Check your
467 database driver documentation for which of the five syntax styles,
468 described in PEP 249's paramstyle, is supported.
469 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}.
470 parse_dates : list or dict, default: None
471 - List of column names to parse as dates.
472 - Dict of ``{column_name: format string}`` where format string is
473 strftime compatible in case of parsing string times, or is one of
474 (D, s, ns, ms, us) in case of parsing integer timestamps.
475 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
476 to the keyword arguments of :func:`pandas.to_datetime`
477 Especially useful with databases without native Datetime support,
478 such as SQLite.
479 chunksize : int, default None
480 If specified, return an iterator where `chunksize` is the number of
481 rows to include in each chunk.
482 dtype : Type name or dict of columns
483 Data type for data or columns. E.g. np.float64 or
484 {'a': np.float64, 'b': np.int32, 'c': 'Int64'}.
486 .. versionadded:: 1.3.0
487 dtype_backend : {'numpy_nullable', 'pyarrow'}, default 'numpy_nullable'
488 Back-end data type applied to the resultant :class:`DataFrame`
489 (still experimental). Behaviour is as follows:
491 * ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame`
492 (default).
493 * ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype`
494 DataFrame.
496 .. versionadded:: 2.0
498 Returns
499 -------
500 DataFrame or Iterator[DataFrame]
502 See Also
503 --------
504 read_sql_table : Read SQL database table into a DataFrame.
505 read_sql : Read SQL query or database table into a DataFrame.
507 Notes
508 -----
509 Any datetime values with time zone information parsed via the `parse_dates`
510 parameter will be converted to UTC.
512 Examples
513 --------
514 >>> from sqlalchemy import create_engine # doctest: +SKIP
515 >>> engine = create_engine("sqlite:///database.db") # doctest: +SKIP
516 >>> with engine.connect() as conn, conn.begin(): # doctest: +SKIP
517 ... data = pd.read_sql_table("data", conn) # doctest: +SKIP
518 """
520 check_dtype_backend(dtype_backend)
521 if dtype_backend is lib.no_default:
522 dtype_backend = "numpy" # type: ignore[assignment]
523 assert dtype_backend is not lib.no_default
525 with pandasSQL_builder(con) as pandas_sql:
526 return pandas_sql.read_query(
527 sql,
528 index_col=index_col,
529 params=params,
530 coerce_float=coerce_float,
531 parse_dates=parse_dates,
532 chunksize=chunksize,
533 dtype=dtype,
534 dtype_backend=dtype_backend,
535 )
538@overload
539def read_sql(
540 sql,
541 con,
542 index_col: str | list[str] | None = ...,
543 coerce_float=...,
544 params=...,
545 parse_dates=...,
546 columns: list[str] = ...,
547 chunksize: None = ...,
548 dtype_backend: DtypeBackend | lib.NoDefault = ...,
549 dtype: DtypeArg | None = None,
550) -> DataFrame:
551 ...
554@overload
555def read_sql(
556 sql,
557 con,
558 index_col: str | list[str] | None = ...,
559 coerce_float=...,
560 params=...,
561 parse_dates=...,
562 columns: list[str] = ...,
563 chunksize: int = ...,
564 dtype_backend: DtypeBackend | lib.NoDefault = ...,
565 dtype: DtypeArg | None = None,
566) -> Iterator[DataFrame]:
567 ...
570def read_sql(
571 sql,
572 con,
573 index_col: str | list[str] | None = None,
574 coerce_float: bool = True,
575 params=None,
576 parse_dates=None,
577 columns: list[str] | None = None,
578 chunksize: int | None = None,
579 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
580 dtype: DtypeArg | None = None,
581) -> DataFrame | Iterator[DataFrame]:
582 """
583 Read SQL query or database table into a DataFrame.
585 This function is a convenience wrapper around ``read_sql_table`` and
586 ``read_sql_query`` (for backward compatibility). It will delegate
587 to the specific function depending on the provided input. A SQL query
588 will be routed to ``read_sql_query``, while a database table name will
589 be routed to ``read_sql_table``. Note that the delegated function might
590 have more specific notes about their functionality not listed here.
592 Parameters
593 ----------
594 sql : str or SQLAlchemy Selectable (select or text object)
595 SQL query to be executed or a table name.
596 con : ADBC Connection, SQLAlchemy connectable, str, or sqlite3 connection
597 ADBC provides high performance I/O with native type support, where available.
598 Using SQLAlchemy makes it possible to use any DB supported by that
599 library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible
600 for engine disposal and connection closure for the ADBC connection and
601 SQLAlchemy connectable; str connections are closed automatically. See
602 `here <https://docs.sqlalchemy.org/en/20/core/connections.html>`_.
603 index_col : str or list of str, optional, default: None
604 Column(s) to set as index(MultiIndex).
605 coerce_float : bool, default True
606 Attempts to convert values of non-string, non-numeric objects (like
607 decimal.Decimal) to floating point, useful for SQL result sets.
608 params : list, tuple or dict, optional, default: None
609 List of parameters to pass to execute method. The syntax used
610 to pass parameters is database driver dependent. Check your
611 database driver documentation for which of the five syntax styles,
612 described in PEP 249's paramstyle, is supported.
613 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}.
614 parse_dates : list or dict, default: None
615 - List of column names to parse as dates.
616 - Dict of ``{column_name: format string}`` where format string is
617 strftime compatible in case of parsing string times, or is one of
618 (D, s, ns, ms, us) in case of parsing integer timestamps.
619 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
620 to the keyword arguments of :func:`pandas.to_datetime`
621 Especially useful with databases without native Datetime support,
622 such as SQLite.
623 columns : list, default: None
624 List of column names to select from SQL table (only used when reading
625 a table).
626 chunksize : int, default None
627 If specified, return an iterator where `chunksize` is the
628 number of rows to include in each chunk.
629 dtype_backend : {'numpy_nullable', 'pyarrow'}, default 'numpy_nullable'
630 Back-end data type applied to the resultant :class:`DataFrame`
631 (still experimental). Behaviour is as follows:
633 * ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame`
634 (default).
635 * ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype`
636 DataFrame.
638 .. versionadded:: 2.0
639 dtype : Type name or dict of columns
640 Data type for data or columns. E.g. np.float64 or
641 {'a': np.float64, 'b': np.int32, 'c': 'Int64'}.
642 The argument is ignored if a table is passed instead of a query.
644 .. versionadded:: 2.0.0
646 Returns
647 -------
648 DataFrame or Iterator[DataFrame]
650 See Also
651 --------
652 read_sql_table : Read SQL database table into a DataFrame.
653 read_sql_query : Read SQL query into a DataFrame.
655 Examples
656 --------
657 Read data from SQL via either a SQL query or a SQL tablename.
658 When using a SQLite database only SQL queries are accepted,
659 providing only the SQL tablename will result in an error.
661 >>> from sqlite3 import connect
662 >>> conn = connect(':memory:')
663 >>> df = pd.DataFrame(data=[[0, '10/11/12'], [1, '12/11/10']],
664 ... columns=['int_column', 'date_column'])
665 >>> df.to_sql(name='test_data', con=conn)
666 2
668 >>> pd.read_sql('SELECT int_column, date_column FROM test_data', conn)
669 int_column date_column
670 0 0 10/11/12
671 1 1 12/11/10
673 >>> pd.read_sql('test_data', 'postgres:///db_name') # doctest:+SKIP
675 Apply date parsing to columns through the ``parse_dates`` argument
676 The ``parse_dates`` argument calls ``pd.to_datetime`` on the provided columns.
677 Custom argument values for applying ``pd.to_datetime`` on a column are specified
678 via a dictionary format:
680 >>> pd.read_sql('SELECT int_column, date_column FROM test_data',
681 ... conn,
682 ... parse_dates={"date_column": {"format": "%d/%m/%y"}})
683 int_column date_column
684 0 0 2012-11-10
685 1 1 2010-11-12
687 .. versionadded:: 2.2.0
689 pandas now supports reading via ADBC drivers
691 >>> from adbc_driver_postgresql import dbapi # doctest:+SKIP
692 >>> with dbapi.connect('postgres:///db_name') as conn: # doctest:+SKIP
693 ... pd.read_sql('SELECT int_column FROM test_data', conn)
694 int_column
695 0 0
696 1 1
697 """
699 check_dtype_backend(dtype_backend)
700 if dtype_backend is lib.no_default:
701 dtype_backend = "numpy" # type: ignore[assignment]
702 assert dtype_backend is not lib.no_default
704 with pandasSQL_builder(con) as pandas_sql:
705 if isinstance(pandas_sql, SQLiteDatabase):
706 return pandas_sql.read_query(
707 sql,
708 index_col=index_col,
709 params=params,
710 coerce_float=coerce_float,
711 parse_dates=parse_dates,
712 chunksize=chunksize,
713 dtype_backend=dtype_backend,
714 dtype=dtype,
715 )
717 try:
718 _is_table_name = pandas_sql.has_table(sql)
719 except Exception:
720 # using generic exception to catch errors from sql drivers (GH24988)
721 _is_table_name = False
723 if _is_table_name:
724 return pandas_sql.read_table(
725 sql,
726 index_col=index_col,
727 coerce_float=coerce_float,
728 parse_dates=parse_dates,
729 columns=columns,
730 chunksize=chunksize,
731 dtype_backend=dtype_backend,
732 )
733 else:
734 return pandas_sql.read_query(
735 sql,
736 index_col=index_col,
737 params=params,
738 coerce_float=coerce_float,
739 parse_dates=parse_dates,
740 chunksize=chunksize,
741 dtype_backend=dtype_backend,
742 dtype=dtype,
743 )
746def to_sql(
747 frame,
748 name: str,
749 con,
750 schema: str | None = None,
751 if_exists: Literal["fail", "replace", "append"] = "fail",
752 index: bool = True,
753 index_label: IndexLabel | None = None,
754 chunksize: int | None = None,
755 dtype: DtypeArg | None = None,
756 method: Literal["multi"] | Callable | None = None,
757 engine: str = "auto",
758 **engine_kwargs,
759) -> int | None:
760 """
761 Write records stored in a DataFrame to a SQL database.
763 Parameters
764 ----------
765 frame : DataFrame, Series
766 name : str
767 Name of SQL table.
768 con : ADBC Connection, SQLAlchemy connectable, str, or sqlite3 connection
769 or sqlite3 DBAPI2 connection
770 ADBC provides high performance I/O with native type support, where available.
771 Using SQLAlchemy makes it possible to use any DB supported by that
772 library.
773 If a DBAPI2 object, only sqlite3 is supported.
774 schema : str, optional
775 Name of SQL schema in database to write to (if database flavor
776 supports this). If None, use default schema (default).
777 if_exists : {'fail', 'replace', 'append'}, default 'fail'
778 - fail: If table exists, do nothing.
779 - replace: If table exists, drop it, recreate it, and insert data.
780 - append: If table exists, insert data. Create if does not exist.
781 index : bool, default True
782 Write DataFrame index as a column.
783 index_label : str or sequence, optional
784 Column label for index column(s). If None is given (default) and
785 `index` is True, then the index names are used.
786 A sequence should be given if the DataFrame uses MultiIndex.
787 chunksize : int, optional
788 Specify the number of rows in each batch to be written at a time.
789 By default, all rows will be written at once.
790 dtype : dict or scalar, optional
791 Specifying the datatype for columns. If a dictionary is used, the
792 keys should be the column names and the values should be the
793 SQLAlchemy types or strings for the sqlite3 fallback mode. If a
794 scalar is provided, it will be applied to all columns.
795 method : {None, 'multi', callable}, optional
796 Controls the SQL insertion clause used:
798 - None : Uses standard SQL ``INSERT`` clause (one per row).
799 - ``'multi'``: Pass multiple values in a single ``INSERT`` clause.
800 - callable with signature ``(pd_table, conn, keys, data_iter) -> int | None``.
802 Details and a sample callable implementation can be found in the
803 section :ref:`insert method <io.sql.method>`.
804 engine : {'auto', 'sqlalchemy'}, default 'auto'
805 SQL engine library to use. If 'auto', then the option
806 ``io.sql.engine`` is used. The default ``io.sql.engine``
807 behavior is 'sqlalchemy'
809 .. versionadded:: 1.3.0
811 **engine_kwargs
812 Any additional kwargs are passed to the engine.
814 Returns
815 -------
816 None or int
817 Number of rows affected by to_sql. None is returned if the callable
818 passed into ``method`` does not return an integer number of rows.
820 .. versionadded:: 1.4.0
822 Notes
823 -----
824 The returned rows affected is the sum of the ``rowcount`` attribute of ``sqlite3.Cursor``
825 or SQLAlchemy connectable. If using ADBC the returned rows are the result
826 of ``Cursor.adbc_ingest``. The returned value may not reflect the exact number of written
827 rows as stipulated in the
828 `sqlite3 <https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.rowcount>`__ or
829 `SQLAlchemy <https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount>`__
830 """ # noqa: E501
831 if if_exists not in ("fail", "replace", "append"):
832 raise ValueError(f"'{if_exists}' is not valid for if_exists")
834 if isinstance(frame, Series):
835 frame = frame.to_frame()
836 elif not isinstance(frame, DataFrame):
837 raise NotImplementedError(
838 "'frame' argument should be either a Series or a DataFrame"
839 )
841 with pandasSQL_builder(con, schema=schema, need_transaction=True) as pandas_sql:
842 return pandas_sql.to_sql(
843 frame,
844 name,
845 if_exists=if_exists,
846 index=index,
847 index_label=index_label,
848 schema=schema,
849 chunksize=chunksize,
850 dtype=dtype,
851 method=method,
852 engine=engine,
853 **engine_kwargs,
854 )
857def has_table(table_name: str, con, schema: str | None = None) -> bool:
858 """
859 Check if DataBase has named table.
861 Parameters
862 ----------
863 table_name: string
864 Name of SQL table.
865 con: ADBC Connection, SQLAlchemy connectable, str, or sqlite3 connection
866 ADBC provides high performance I/O with native type support, where available.
867 Using SQLAlchemy makes it possible to use any DB supported by that
868 library.
869 If a DBAPI2 object, only sqlite3 is supported.
870 schema : string, default None
871 Name of SQL schema in database to write to (if database flavor supports
872 this). If None, use default schema (default).
874 Returns
875 -------
876 boolean
877 """
878 with pandasSQL_builder(con, schema=schema) as pandas_sql:
879 return pandas_sql.has_table(table_name)
882table_exists = has_table
885def pandasSQL_builder(
886 con,
887 schema: str | None = None,
888 need_transaction: bool = False,
889) -> PandasSQL:
890 """
891 Convenience function to return the correct PandasSQL subclass based on the
892 provided parameters. Also creates a sqlalchemy connection and transaction
893 if necessary.
894 """
895 import sqlite3
897 if isinstance(con, sqlite3.Connection) or con is None:
898 return SQLiteDatabase(con)
900 sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore")
902 if isinstance(con, str) and sqlalchemy is None:
903 raise ImportError("Using URI string without sqlalchemy installed.")
905 if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Connectable)):
906 return SQLDatabase(con, schema, need_transaction)
908 adbc = import_optional_dependency("adbc_driver_manager.dbapi", errors="ignore")
909 if adbc and isinstance(con, adbc.Connection):
910 return ADBCDatabase(con)
912 warnings.warn(
913 "pandas only supports SQLAlchemy connectable (engine/connection) or "
914 "database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 "
915 "objects are not tested. Please consider using SQLAlchemy.",
916 UserWarning,
917 stacklevel=find_stack_level(),
918 )
919 return SQLiteDatabase(con)
922class SQLTable(PandasObject):
923 """
924 For mapping Pandas tables to SQL tables.
925 Uses fact that table is reflected by SQLAlchemy to
926 do better type conversions.
927 Also holds various flags needed to avoid having to
928 pass them between functions all the time.
929 """
931 # TODO: support for multiIndex
933 def __init__(
934 self,
935 name: str,
936 pandas_sql_engine,
937 frame=None,
938 index: bool | str | list[str] | None = True,
939 if_exists: Literal["fail", "replace", "append"] = "fail",
940 prefix: str = "pandas",
941 index_label=None,
942 schema=None,
943 keys=None,
944 dtype: DtypeArg | None = None,
945 ) -> None:
946 self.name = name
947 self.pd_sql = pandas_sql_engine
948 self.prefix = prefix
949 self.frame = frame
950 self.index = self._index_name(index, index_label)
951 self.schema = schema
952 self.if_exists = if_exists
953 self.keys = keys
954 self.dtype = dtype
956 if frame is not None:
957 # We want to initialize based on a dataframe
958 self.table = self._create_table_setup()
959 else:
960 # no data provided, read-only mode
961 self.table = self.pd_sql.get_table(self.name, self.schema)
963 if self.table is None:
964 raise ValueError(f"Could not init table '{name}'")
966 if not len(self.name):
967 raise ValueError("Empty table name specified")
969 def exists(self):
970 return self.pd_sql.has_table(self.name, self.schema)
972 def sql_schema(self) -> str:
973 from sqlalchemy.schema import CreateTable
975 return str(CreateTable(self.table).compile(self.pd_sql.con))
977 def _execute_create(self) -> None:
978 # Inserting table into database, add to MetaData object
979 self.table = self.table.to_metadata(self.pd_sql.meta)
980 with self.pd_sql.run_transaction():
981 self.table.create(bind=self.pd_sql.con)
983 def create(self) -> None:
984 if self.exists():
985 if self.if_exists == "fail":
986 raise ValueError(f"Table '{self.name}' already exists.")
987 if self.if_exists == "replace":
988 self.pd_sql.drop_table(self.name, self.schema)
989 self._execute_create()
990 elif self.if_exists == "append":
991 pass
992 else:
993 raise ValueError(f"'{self.if_exists}' is not valid for if_exists")
994 else:
995 self._execute_create()
997 def _execute_insert(self, conn, keys: list[str], data_iter) -> int:
998 """
999 Execute SQL statement inserting data
1001 Parameters
1002 ----------
1003 conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
1004 keys : list of str
1005 Column names
1006 data_iter : generator of list
1007 Each item contains a list of values to be inserted
1008 """
1009 data = [dict(zip(keys, row)) for row in data_iter]
1010 result = conn.execute(self.table.insert(), data)
1011 return result.rowcount
1013 def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:
1014 """
1015 Alternative to _execute_insert for DBs support multi-value INSERT.
1017 Note: multi-value insert is usually faster for analytics DBs
1018 and tables containing a few columns
1019 but performance degrades quickly with increase of columns.
1021 """
1023 from sqlalchemy import insert
1025 data = [dict(zip(keys, row)) for row in data_iter]
1026 stmt = insert(self.table).values(data)
1027 result = conn.execute(stmt)
1028 return result.rowcount
1030 def insert_data(self) -> tuple[list[str], list[np.ndarray]]:
1031 if self.index is not None:
1032 temp = self.frame.copy()
1033 temp.index.names = self.index
1034 try:
1035 temp.reset_index(inplace=True)
1036 except ValueError as err:
1037 raise ValueError(f"duplicate name in index/columns: {err}") from err
1038 else:
1039 temp = self.frame
1041 column_names = list(map(str, temp.columns))
1042 ncols = len(column_names)
1043 # this just pre-allocates the list: None's will be replaced with ndarrays
1044 # error: List item 0 has incompatible type "None"; expected "ndarray"
1045 data_list: list[np.ndarray] = [None] * ncols # type: ignore[list-item]
1047 for i, (_, ser) in enumerate(temp.items()):
1048 if ser.dtype.kind == "M":
1049 if isinstance(ser._values, ArrowExtensionArray):
1050 import pyarrow as pa
1052 if pa.types.is_date(ser.dtype.pyarrow_dtype):
1053 # GH#53854 to_pydatetime not supported for pyarrow date dtypes
1054 d = ser._values.to_numpy(dtype=object)
1055 else:
1056 with warnings.catch_warnings():
1057 warnings.filterwarnings("ignore", category=FutureWarning)
1058 # GH#52459 to_pydatetime will return Index[object]
1059 d = np.asarray(ser.dt.to_pydatetime(), dtype=object)
1060 else:
1061 d = ser._values.to_pydatetime()
1062 elif ser.dtype.kind == "m":
1063 vals = ser._values
1064 if isinstance(vals, ArrowExtensionArray):
1065 vals = vals.to_numpy(dtype=np.dtype("m8[ns]"))
1066 # store as integers, see GH#6921, GH#7076
1067 d = vals.view("i8").astype(object)
1068 else:
1069 d = ser._values.astype(object)
1071 assert isinstance(d, np.ndarray), type(d)
1073 if ser._can_hold_na:
1074 # Note: this will miss timedeltas since they are converted to int
1075 mask = isna(d)
1076 d[mask] = None
1078 data_list[i] = d
1080 return column_names, data_list
1082 def insert(
1083 self,
1084 chunksize: int | None = None,
1085 method: Literal["multi"] | Callable | None = None,
1086 ) -> int | None:
1087 # set insert method
1088 if method is None:
1089 exec_insert = self._execute_insert
1090 elif method == "multi":
1091 exec_insert = self._execute_insert_multi
1092 elif callable(method):
1093 exec_insert = partial(method, self)
1094 else:
1095 raise ValueError(f"Invalid parameter `method`: {method}")
1097 keys, data_list = self.insert_data()
1099 nrows = len(self.frame)
1101 if nrows == 0:
1102 return 0
1104 if chunksize is None:
1105 chunksize = nrows
1106 elif chunksize == 0:
1107 raise ValueError("chunksize argument should be non-zero")
1109 chunks = (nrows // chunksize) + 1
1110 total_inserted = None
1111 with self.pd_sql.run_transaction() as conn:
1112 for i in range(chunks):
1113 start_i = i * chunksize
1114 end_i = min((i + 1) * chunksize, nrows)
1115 if start_i >= end_i:
1116 break
1118 chunk_iter = zip(*(arr[start_i:end_i] for arr in data_list))
1119 num_inserted = exec_insert(conn, keys, chunk_iter)
1120 # GH 46891
1121 if num_inserted is not None:
1122 if total_inserted is None:
1123 total_inserted = num_inserted
1124 else:
1125 total_inserted += num_inserted
1126 return total_inserted
1128 def _query_iterator(
1129 self,
1130 result,
1131 exit_stack: ExitStack,
1132 chunksize: int | None,
1133 columns,
1134 coerce_float: bool = True,
1135 parse_dates=None,
1136 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1137 ):
1138 """Return generator through chunked result set."""
1139 has_read_data = False
1140 with exit_stack:
1141 while True:
1142 data = result.fetchmany(chunksize)
1143 if not data:
1144 if not has_read_data:
1145 yield DataFrame.from_records(
1146 [], columns=columns, coerce_float=coerce_float
1147 )
1148 break
1150 has_read_data = True
1151 self.frame = _convert_arrays_to_dataframe(
1152 data, columns, coerce_float, dtype_backend
1153 )
1155 self._harmonize_columns(
1156 parse_dates=parse_dates, dtype_backend=dtype_backend
1157 )
1159 if self.index is not None:
1160 self.frame.set_index(self.index, inplace=True)
1162 yield self.frame
1164 def read(
1165 self,
1166 exit_stack: ExitStack,
1167 coerce_float: bool = True,
1168 parse_dates=None,
1169 columns=None,
1170 chunksize: int | None = None,
1171 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1172 ) -> DataFrame | Iterator[DataFrame]:
1173 from sqlalchemy import select
1175 if columns is not None and len(columns) > 0:
1176 cols = [self.table.c[n] for n in columns]
1177 if self.index is not None:
1178 for idx in self.index[::-1]:
1179 cols.insert(0, self.table.c[idx])
1180 sql_select = select(*cols)
1181 else:
1182 sql_select = select(self.table)
1183 result = self.pd_sql.execute(sql_select)
1184 column_names = result.keys()
1186 if chunksize is not None:
1187 return self._query_iterator(
1188 result,
1189 exit_stack,
1190 chunksize,
1191 column_names,
1192 coerce_float=coerce_float,
1193 parse_dates=parse_dates,
1194 dtype_backend=dtype_backend,
1195 )
1196 else:
1197 data = result.fetchall()
1198 self.frame = _convert_arrays_to_dataframe(
1199 data, column_names, coerce_float, dtype_backend
1200 )
1202 self._harmonize_columns(
1203 parse_dates=parse_dates, dtype_backend=dtype_backend
1204 )
1206 if self.index is not None:
1207 self.frame.set_index(self.index, inplace=True)
1209 return self.frame
1211 def _index_name(self, index, index_label):
1212 # for writing: index=True to include index in sql table
1213 if index is True:
1214 nlevels = self.frame.index.nlevels
1215 # if index_label is specified, set this as index name(s)
1216 if index_label is not None:
1217 if not isinstance(index_label, list):
1218 index_label = [index_label]
1219 if len(index_label) != nlevels:
1220 raise ValueError(
1221 "Length of 'index_label' should match number of "
1222 f"levels, which is {nlevels}"
1223 )
1224 return index_label
1225 # return the used column labels for the index columns
1226 if (
1227 nlevels == 1
1228 and "index" not in self.frame.columns
1229 and self.frame.index.name is None
1230 ):
1231 return ["index"]
1232 else:
1233 return com.fill_missing_names(self.frame.index.names)
1235 # for reading: index=(list of) string to specify column to set as index
1236 elif isinstance(index, str):
1237 return [index]
1238 elif isinstance(index, list):
1239 return index
1240 else:
1241 return None
1243 def _get_column_names_and_types(self, dtype_mapper):
1244 column_names_and_types = []
1245 if self.index is not None:
1246 for i, idx_label in enumerate(self.index):
1247 idx_type = dtype_mapper(self.frame.index._get_level_values(i))
1248 column_names_and_types.append((str(idx_label), idx_type, True))
1250 column_names_and_types += [
1251 (str(self.frame.columns[i]), dtype_mapper(self.frame.iloc[:, i]), False)
1252 for i in range(len(self.frame.columns))
1253 ]
1255 return column_names_and_types
1257 def _create_table_setup(self):
1258 from sqlalchemy import (
1259 Column,
1260 PrimaryKeyConstraint,
1261 Table,
1262 )
1263 from sqlalchemy.schema import MetaData
1265 column_names_and_types = self._get_column_names_and_types(self._sqlalchemy_type)
1267 columns: list[Any] = [
1268 Column(name, typ, index=is_index)
1269 for name, typ, is_index in column_names_and_types
1270 ]
1272 if self.keys is not None:
1273 if not is_list_like(self.keys):
1274 keys = [self.keys]
1275 else:
1276 keys = self.keys
1277 pkc = PrimaryKeyConstraint(*keys, name=self.name + "_pk")
1278 columns.append(pkc)
1280 schema = self.schema or self.pd_sql.meta.schema
1282 # At this point, attach to new metadata, only attach to self.meta
1283 # once table is created.
1284 meta = MetaData()
1285 return Table(self.name, meta, *columns, schema=schema)
1287 def _harmonize_columns(
1288 self,
1289 parse_dates=None,
1290 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1291 ) -> None:
1292 """
1293 Make the DataFrame's column types align with the SQL table
1294 column types.
1295 Need to work around limited NA value support. Floats are always
1296 fine, ints must always be floats if there are Null values.
1297 Booleans are hard because converting bool column with None replaces
1298 all Nones with false. Therefore only convert bool if there are no
1299 NA values.
1300 Datetimes should already be converted to np.datetime64 if supported,
1301 but here we also force conversion if required.
1302 """
1303 parse_dates = _process_parse_dates_argument(parse_dates)
1305 for sql_col in self.table.columns:
1306 col_name = sql_col.name
1307 try:
1308 df_col = self.frame[col_name]
1310 # Handle date parsing upfront; don't try to convert columns
1311 # twice
1312 if col_name in parse_dates:
1313 try:
1314 fmt = parse_dates[col_name]
1315 except TypeError:
1316 fmt = None
1317 self.frame[col_name] = _handle_date_column(df_col, format=fmt)
1318 continue
1320 # the type the dataframe column should have
1321 col_type = self._get_dtype(sql_col.type)
1323 if (
1324 col_type is datetime
1325 or col_type is date
1326 or col_type is DatetimeTZDtype
1327 ):
1328 # Convert tz-aware Datetime SQL columns to UTC
1329 utc = col_type is DatetimeTZDtype
1330 self.frame[col_name] = _handle_date_column(df_col, utc=utc)
1331 elif dtype_backend == "numpy" and col_type is float:
1332 # floats support NA, can always convert!
1333 self.frame[col_name] = df_col.astype(col_type, copy=False)
1335 elif dtype_backend == "numpy" and len(df_col) == df_col.count():
1336 # No NA values, can convert ints and bools
1337 if col_type is np.dtype("int64") or col_type is bool:
1338 self.frame[col_name] = df_col.astype(col_type, copy=False)
1339 except KeyError:
1340 pass # this column not in results
1342 def _sqlalchemy_type(self, col: Index | Series):
1343 dtype: DtypeArg = self.dtype or {}
1344 if is_dict_like(dtype):
1345 dtype = cast(dict, dtype)
1346 if col.name in dtype:
1347 return dtype[col.name]
1349 # Infer type of column, while ignoring missing values.
1350 # Needed for inserting typed data containing NULLs, GH 8778.
1351 col_type = lib.infer_dtype(col, skipna=True)
1353 from sqlalchemy.types import (
1354 TIMESTAMP,
1355 BigInteger,
1356 Boolean,
1357 Date,
1358 DateTime,
1359 Float,
1360 Integer,
1361 SmallInteger,
1362 Text,
1363 Time,
1364 )
1366 if col_type in ("datetime64", "datetime"):
1367 # GH 9086: TIMESTAMP is the suggested type if the column contains
1368 # timezone information
1369 try:
1370 # error: Item "Index" of "Union[Index, Series]" has no attribute "dt"
1371 if col.dt.tz is not None: # type: ignore[union-attr]
1372 return TIMESTAMP(timezone=True)
1373 except AttributeError:
1374 # The column is actually a DatetimeIndex
1375 # GH 26761 or an Index with date-like data e.g. 9999-01-01
1376 if getattr(col, "tz", None) is not None:
1377 return TIMESTAMP(timezone=True)
1378 return DateTime
1379 if col_type == "timedelta64":
1380 warnings.warn(
1381 "the 'timedelta' type is not supported, and will be "
1382 "written as integer values (ns frequency) to the database.",
1383 UserWarning,
1384 stacklevel=find_stack_level(),
1385 )
1386 return BigInteger
1387 elif col_type == "floating":
1388 if col.dtype == "float32":
1389 return Float(precision=23)
1390 else:
1391 return Float(precision=53)
1392 elif col_type == "integer":
1393 # GH35076 Map pandas integer to optimal SQLAlchemy integer type
1394 if col.dtype.name.lower() in ("int8", "uint8", "int16"):
1395 return SmallInteger
1396 elif col.dtype.name.lower() in ("uint16", "int32"):
1397 return Integer
1398 elif col.dtype.name.lower() == "uint64":
1399 raise ValueError("Unsigned 64 bit integer datatype is not supported")
1400 else:
1401 return BigInteger
1402 elif col_type == "boolean":
1403 return Boolean
1404 elif col_type == "date":
1405 return Date
1406 elif col_type == "time":
1407 return Time
1408 elif col_type == "complex":
1409 raise ValueError("Complex datatypes not supported")
1411 return Text
1413 def _get_dtype(self, sqltype):
1414 from sqlalchemy.types import (
1415 TIMESTAMP,
1416 Boolean,
1417 Date,
1418 DateTime,
1419 Float,
1420 Integer,
1421 )
1423 if isinstance(sqltype, Float):
1424 return float
1425 elif isinstance(sqltype, Integer):
1426 # TODO: Refine integer size.
1427 return np.dtype("int64")
1428 elif isinstance(sqltype, TIMESTAMP):
1429 # we have a timezone capable type
1430 if not sqltype.timezone:
1431 return datetime
1432 return DatetimeTZDtype
1433 elif isinstance(sqltype, DateTime):
1434 # Caution: np.datetime64 is also a subclass of np.number.
1435 return datetime
1436 elif isinstance(sqltype, Date):
1437 return date
1438 elif isinstance(sqltype, Boolean):
1439 return bool
1440 return object
1443class PandasSQL(PandasObject, ABC):
1444 """
1445 Subclasses Should define read_query and to_sql.
1446 """
1448 def __enter__(self) -> Self:
1449 return self
1451 def __exit__(self, *args) -> None:
1452 pass
1454 def read_table(
1455 self,
1456 table_name: str,
1457 index_col: str | list[str] | None = None,
1458 coerce_float: bool = True,
1459 parse_dates=None,
1460 columns=None,
1461 schema: str | None = None,
1462 chunksize: int | None = None,
1463 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1464 ) -> DataFrame | Iterator[DataFrame]:
1465 raise NotImplementedError
1467 @abstractmethod
1468 def read_query(
1469 self,
1470 sql: str,
1471 index_col: str | list[str] | None = None,
1472 coerce_float: bool = True,
1473 parse_dates=None,
1474 params=None,
1475 chunksize: int | None = None,
1476 dtype: DtypeArg | None = None,
1477 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1478 ) -> DataFrame | Iterator[DataFrame]:
1479 pass
1481 @abstractmethod
1482 def to_sql(
1483 self,
1484 frame,
1485 name: str,
1486 if_exists: Literal["fail", "replace", "append"] = "fail",
1487 index: bool = True,
1488 index_label=None,
1489 schema=None,
1490 chunksize: int | None = None,
1491 dtype: DtypeArg | None = None,
1492 method: Literal["multi"] | Callable | None = None,
1493 engine: str = "auto",
1494 **engine_kwargs,
1495 ) -> int | None:
1496 pass
1498 @abstractmethod
1499 def execute(self, sql: str | Select | TextClause, params=None):
1500 pass
1502 @abstractmethod
1503 def has_table(self, name: str, schema: str | None = None) -> bool:
1504 pass
1506 @abstractmethod
1507 def _create_sql_schema(
1508 self,
1509 frame: DataFrame,
1510 table_name: str,
1511 keys: list[str] | None = None,
1512 dtype: DtypeArg | None = None,
1513 schema: str | None = None,
1514 ) -> str:
1515 pass
1518class BaseEngine:
1519 def insert_records(
1520 self,
1521 table: SQLTable,
1522 con,
1523 frame,
1524 name: str,
1525 index: bool | str | list[str] | None = True,
1526 schema=None,
1527 chunksize: int | None = None,
1528 method=None,
1529 **engine_kwargs,
1530 ) -> int | None:
1531 """
1532 Inserts data into already-prepared table
1533 """
1534 raise AbstractMethodError(self)
1537class SQLAlchemyEngine(BaseEngine):
1538 def __init__(self) -> None:
1539 import_optional_dependency(
1540 "sqlalchemy", extra="sqlalchemy is required for SQL support."
1541 )
1543 def insert_records(
1544 self,
1545 table: SQLTable,
1546 con,
1547 frame,
1548 name: str,
1549 index: bool | str | list[str] | None = True,
1550 schema=None,
1551 chunksize: int | None = None,
1552 method=None,
1553 **engine_kwargs,
1554 ) -> int | None:
1555 from sqlalchemy import exc
1557 try:
1558 return table.insert(chunksize=chunksize, method=method)
1559 except exc.StatementError as err:
1560 # GH34431
1561 # https://stackoverflow.com/a/67358288/6067848
1562 msg = r"""(\(1054, "Unknown column 'inf(e0)?' in 'field list'"\))(?#
1563 )|inf can not be used with MySQL"""
1564 err_text = str(err.orig)
1565 if re.search(msg, err_text):
1566 raise ValueError("inf cannot be used with MySQL") from err
1567 raise err
1570def get_engine(engine: str) -> BaseEngine:
1571 """return our implementation"""
1572 if engine == "auto":
1573 engine = get_option("io.sql.engine")
1575 if engine == "auto":
1576 # try engines in this order
1577 engine_classes = [SQLAlchemyEngine]
1579 error_msgs = ""
1580 for engine_class in engine_classes:
1581 try:
1582 return engine_class()
1583 except ImportError as err:
1584 error_msgs += "\n - " + str(err)
1586 raise ImportError(
1587 "Unable to find a usable engine; "
1588 "tried using: 'sqlalchemy'.\n"
1589 "A suitable version of "
1590 "sqlalchemy is required for sql I/O "
1591 "support.\n"
1592 "Trying to import the above resulted in these errors:"
1593 f"{error_msgs}"
1594 )
1596 if engine == "sqlalchemy":
1597 return SQLAlchemyEngine()
1599 raise ValueError("engine must be one of 'auto', 'sqlalchemy'")
1602class SQLDatabase(PandasSQL):
1603 """
1604 This class enables conversion between DataFrame and SQL databases
1605 using SQLAlchemy to handle DataBase abstraction.
1607 Parameters
1608 ----------
1609 con : SQLAlchemy Connectable or URI string.
1610 Connectable to connect with the database. Using SQLAlchemy makes it
1611 possible to use any DB supported by that library.
1612 schema : string, default None
1613 Name of SQL schema in database to write to (if database flavor
1614 supports this). If None, use default schema (default).
1615 need_transaction : bool, default False
1616 If True, SQLDatabase will create a transaction.
1618 """
1620 def __init__(
1621 self, con, schema: str | None = None, need_transaction: bool = False
1622 ) -> None:
1623 from sqlalchemy import create_engine
1624 from sqlalchemy.engine import Engine
1625 from sqlalchemy.schema import MetaData
1627 # self.exit_stack cleans up the Engine and Connection and commits the
1628 # transaction if any of those objects was created below.
1629 # Cleanup happens either in self.__exit__ or at the end of the iterator
1630 # returned by read_sql when chunksize is not None.
1631 self.exit_stack = ExitStack()
1632 if isinstance(con, str):
1633 con = create_engine(con)
1634 self.exit_stack.callback(con.dispose)
1635 if isinstance(con, Engine):
1636 con = self.exit_stack.enter_context(con.connect())
1637 if need_transaction and not con.in_transaction():
1638 self.exit_stack.enter_context(con.begin())
1639 self.con = con
1640 self.meta = MetaData(schema=schema)
1641 self.returns_generator = False
1643 def __exit__(self, *args) -> None:
1644 if not self.returns_generator:
1645 self.exit_stack.close()
1647 @contextmanager
1648 def run_transaction(self):
1649 if not self.con.in_transaction():
1650 with self.con.begin():
1651 yield self.con
1652 else:
1653 yield self.con
1655 def execute(self, sql: str | Select | TextClause, params=None):
1656 """Simple passthrough to SQLAlchemy connectable"""
1657 args = [] if params is None else [params]
1658 if isinstance(sql, str):
1659 return self.con.exec_driver_sql(sql, *args)
1660 return self.con.execute(sql, *args)
1662 def read_table(
1663 self,
1664 table_name: str,
1665 index_col: str | list[str] | None = None,
1666 coerce_float: bool = True,
1667 parse_dates=None,
1668 columns=None,
1669 schema: str | None = None,
1670 chunksize: int | None = None,
1671 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1672 ) -> DataFrame | Iterator[DataFrame]:
1673 """
1674 Read SQL database table into a DataFrame.
1676 Parameters
1677 ----------
1678 table_name : str
1679 Name of SQL table in database.
1680 index_col : string, optional, default: None
1681 Column to set as index.
1682 coerce_float : bool, default True
1683 Attempts to convert values of non-string, non-numeric objects
1684 (like decimal.Decimal) to floating point. This can result in
1685 loss of precision.
1686 parse_dates : list or dict, default: None
1687 - List of column names to parse as dates.
1688 - Dict of ``{column_name: format string}`` where format string is
1689 strftime compatible in case of parsing string times, or is one of
1690 (D, s, ns, ms, us) in case of parsing integer timestamps.
1691 - Dict of ``{column_name: arg}``, where the arg corresponds
1692 to the keyword arguments of :func:`pandas.to_datetime`.
1693 Especially useful with databases without native Datetime support,
1694 such as SQLite.
1695 columns : list, default: None
1696 List of column names to select from SQL table.
1697 schema : string, default None
1698 Name of SQL schema in database to query (if database flavor
1699 supports this). If specified, this overwrites the default
1700 schema of the SQL database object.
1701 chunksize : int, default None
1702 If specified, return an iterator where `chunksize` is the number
1703 of rows to include in each chunk.
1704 dtype_backend : {'numpy_nullable', 'pyarrow'}, default 'numpy_nullable'
1705 Back-end data type applied to the resultant :class:`DataFrame`
1706 (still experimental). Behaviour is as follows:
1708 * ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame`
1709 (default).
1710 * ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype`
1711 DataFrame.
1713 .. versionadded:: 2.0
1715 Returns
1716 -------
1717 DataFrame
1719 See Also
1720 --------
1721 pandas.read_sql_table
1722 SQLDatabase.read_query
1724 """
1725 self.meta.reflect(bind=self.con, only=[table_name], views=True)
1726 table = SQLTable(table_name, self, index=index_col, schema=schema)
1727 if chunksize is not None:
1728 self.returns_generator = True
1729 return table.read(
1730 self.exit_stack,
1731 coerce_float=coerce_float,
1732 parse_dates=parse_dates,
1733 columns=columns,
1734 chunksize=chunksize,
1735 dtype_backend=dtype_backend,
1736 )
1738 @staticmethod
1739 def _query_iterator(
1740 result,
1741 exit_stack: ExitStack,
1742 chunksize: int,
1743 columns,
1744 index_col=None,
1745 coerce_float: bool = True,
1746 parse_dates=None,
1747 dtype: DtypeArg | None = None,
1748 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1749 ):
1750 """Return generator through chunked result set"""
1751 has_read_data = False
1752 with exit_stack:
1753 while True:
1754 data = result.fetchmany(chunksize)
1755 if not data:
1756 if not has_read_data:
1757 yield _wrap_result(
1758 [],
1759 columns,
1760 index_col=index_col,
1761 coerce_float=coerce_float,
1762 parse_dates=parse_dates,
1763 dtype=dtype,
1764 dtype_backend=dtype_backend,
1765 )
1766 break
1768 has_read_data = True
1769 yield _wrap_result(
1770 data,
1771 columns,
1772 index_col=index_col,
1773 coerce_float=coerce_float,
1774 parse_dates=parse_dates,
1775 dtype=dtype,
1776 dtype_backend=dtype_backend,
1777 )
1779 def read_query(
1780 self,
1781 sql: str,
1782 index_col: str | list[str] | None = None,
1783 coerce_float: bool = True,
1784 parse_dates=None,
1785 params=None,
1786 chunksize: int | None = None,
1787 dtype: DtypeArg | None = None,
1788 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1789 ) -> DataFrame | Iterator[DataFrame]:
1790 """
1791 Read SQL query into a DataFrame.
1793 Parameters
1794 ----------
1795 sql : str
1796 SQL query to be executed.
1797 index_col : string, optional, default: None
1798 Column name to use as index for the returned DataFrame object.
1799 coerce_float : bool, default True
1800 Attempt to convert values of non-string, non-numeric objects (like
1801 decimal.Decimal) to floating point, useful for SQL result sets.
1802 params : list, tuple or dict, optional, default: None
1803 List of parameters to pass to execute method. The syntax used
1804 to pass parameters is database driver dependent. Check your
1805 database driver documentation for which of the five syntax styles,
1806 described in PEP 249's paramstyle, is supported.
1807 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}
1808 parse_dates : list or dict, default: None
1809 - List of column names to parse as dates.
1810 - Dict of ``{column_name: format string}`` where format string is
1811 strftime compatible in case of parsing string times, or is one of
1812 (D, s, ns, ms, us) in case of parsing integer timestamps.
1813 - Dict of ``{column_name: arg dict}``, where the arg dict
1814 corresponds to the keyword arguments of
1815 :func:`pandas.to_datetime` Especially useful with databases
1816 without native Datetime support, such as SQLite.
1817 chunksize : int, default None
1818 If specified, return an iterator where `chunksize` is the number
1819 of rows to include in each chunk.
1820 dtype : Type name or dict of columns
1821 Data type for data or columns. E.g. np.float64 or
1822 {'a': np.float64, 'b': np.int32, 'c': 'Int64'}
1824 .. versionadded:: 1.3.0
1826 Returns
1827 -------
1828 DataFrame
1830 See Also
1831 --------
1832 read_sql_table : Read SQL database table into a DataFrame.
1833 read_sql
1835 """
1836 result = self.execute(sql, params)
1837 columns = result.keys()
1839 if chunksize is not None:
1840 self.returns_generator = True
1841 return self._query_iterator(
1842 result,
1843 self.exit_stack,
1844 chunksize,
1845 columns,
1846 index_col=index_col,
1847 coerce_float=coerce_float,
1848 parse_dates=parse_dates,
1849 dtype=dtype,
1850 dtype_backend=dtype_backend,
1851 )
1852 else:
1853 data = result.fetchall()
1854 frame = _wrap_result(
1855 data,
1856 columns,
1857 index_col=index_col,
1858 coerce_float=coerce_float,
1859 parse_dates=parse_dates,
1860 dtype=dtype,
1861 dtype_backend=dtype_backend,
1862 )
1863 return frame
1865 read_sql = read_query
1867 def prep_table(
1868 self,
1869 frame,
1870 name: str,
1871 if_exists: Literal["fail", "replace", "append"] = "fail",
1872 index: bool | str | list[str] | None = True,
1873 index_label=None,
1874 schema=None,
1875 dtype: DtypeArg | None = None,
1876 ) -> SQLTable:
1877 """
1878 Prepares table in the database for data insertion. Creates it if needed, etc.
1879 """
1880 if dtype:
1881 if not is_dict_like(dtype):
1882 # error: Value expression in dictionary comprehension has incompatible
1883 # type "Union[ExtensionDtype, str, dtype[Any], Type[object],
1884 # Dict[Hashable, Union[ExtensionDtype, Union[str, dtype[Any]],
1885 # Type[str], Type[float], Type[int], Type[complex], Type[bool],
1886 # Type[object]]]]"; expected type "Union[ExtensionDtype, str,
1887 # dtype[Any], Type[object]]"
1888 dtype = {col_name: dtype for col_name in frame} # type: ignore[misc]
1889 else:
1890 dtype = cast(dict, dtype)
1892 from sqlalchemy.types import TypeEngine
1894 for col, my_type in dtype.items():
1895 if isinstance(my_type, type) and issubclass(my_type, TypeEngine):
1896 pass
1897 elif isinstance(my_type, TypeEngine):
1898 pass
1899 else:
1900 raise ValueError(f"The type of {col} is not a SQLAlchemy type")
1902 table = SQLTable(
1903 name,
1904 self,
1905 frame=frame,
1906 index=index,
1907 if_exists=if_exists,
1908 index_label=index_label,
1909 schema=schema,
1910 dtype=dtype,
1911 )
1912 table.create()
1913 return table
1915 def check_case_sensitive(
1916 self,
1917 name: str,
1918 schema: str | None,
1919 ) -> None:
1920 """
1921 Checks table name for issues with case-sensitivity.
1922 Method is called after data is inserted.
1923 """
1924 if not name.isdigit() and not name.islower():
1925 # check for potentially case sensitivity issues (GH7815)
1926 # Only check when name is not a number and name is not lower case
1927 from sqlalchemy import inspect as sqlalchemy_inspect
1929 insp = sqlalchemy_inspect(self.con)
1930 table_names = insp.get_table_names(schema=schema or self.meta.schema)
1931 if name not in table_names:
1932 msg = (
1933 f"The provided table name '{name}' is not found exactly as "
1934 "such in the database after writing the table, possibly "
1935 "due to case sensitivity issues. Consider using lower "
1936 "case table names."
1937 )
1938 warnings.warn(
1939 msg,
1940 UserWarning,
1941 stacklevel=find_stack_level(),
1942 )
1944 def to_sql(
1945 self,
1946 frame,
1947 name: str,
1948 if_exists: Literal["fail", "replace", "append"] = "fail",
1949 index: bool = True,
1950 index_label=None,
1951 schema: str | None = None,
1952 chunksize: int | None = None,
1953 dtype: DtypeArg | None = None,
1954 method: Literal["multi"] | Callable | None = None,
1955 engine: str = "auto",
1956 **engine_kwargs,
1957 ) -> int | None:
1958 """
1959 Write records stored in a DataFrame to a SQL database.
1961 Parameters
1962 ----------
1963 frame : DataFrame
1964 name : string
1965 Name of SQL table.
1966 if_exists : {'fail', 'replace', 'append'}, default 'fail'
1967 - fail: If table exists, do nothing.
1968 - replace: If table exists, drop it, recreate it, and insert data.
1969 - append: If table exists, insert data. Create if does not exist.
1970 index : boolean, default True
1971 Write DataFrame index as a column.
1972 index_label : string or sequence, default None
1973 Column label for index column(s). If None is given (default) and
1974 `index` is True, then the index names are used.
1975 A sequence should be given if the DataFrame uses MultiIndex.
1976 schema : string, default None
1977 Name of SQL schema in database to write to (if database flavor
1978 supports this). If specified, this overwrites the default
1979 schema of the SQLDatabase object.
1980 chunksize : int, default None
1981 If not None, then rows will be written in batches of this size at a
1982 time. If None, all rows will be written at once.
1983 dtype : single type or dict of column name to SQL type, default None
1984 Optional specifying the datatype for columns. The SQL type should
1985 be a SQLAlchemy type. If all columns are of the same type, one
1986 single value can be used.
1987 method : {None', 'multi', callable}, default None
1988 Controls the SQL insertion clause used:
1990 * None : Uses standard SQL ``INSERT`` clause (one per row).
1991 * 'multi': Pass multiple values in a single ``INSERT`` clause.
1992 * callable with signature ``(pd_table, conn, keys, data_iter)``.
1994 Details and a sample callable implementation can be found in the
1995 section :ref:`insert method <io.sql.method>`.
1996 engine : {'auto', 'sqlalchemy'}, default 'auto'
1997 SQL engine library to use. If 'auto', then the option
1998 ``io.sql.engine`` is used. The default ``io.sql.engine``
1999 behavior is 'sqlalchemy'
2001 .. versionadded:: 1.3.0
2003 **engine_kwargs
2004 Any additional kwargs are passed to the engine.
2005 """
2006 sql_engine = get_engine(engine)
2008 table = self.prep_table(
2009 frame=frame,
2010 name=name,
2011 if_exists=if_exists,
2012 index=index,
2013 index_label=index_label,
2014 schema=schema,
2015 dtype=dtype,
2016 )
2018 total_inserted = sql_engine.insert_records(
2019 table=table,
2020 con=self.con,
2021 frame=frame,
2022 name=name,
2023 index=index,
2024 schema=schema,
2025 chunksize=chunksize,
2026 method=method,
2027 **engine_kwargs,
2028 )
2030 self.check_case_sensitive(name=name, schema=schema)
2031 return total_inserted
2033 @property
2034 def tables(self):
2035 return self.meta.tables
2037 def has_table(self, name: str, schema: str | None = None) -> bool:
2038 from sqlalchemy import inspect as sqlalchemy_inspect
2040 insp = sqlalchemy_inspect(self.con)
2041 return insp.has_table(name, schema or self.meta.schema)
2043 def get_table(self, table_name: str, schema: str | None = None) -> Table:
2044 from sqlalchemy import (
2045 Numeric,
2046 Table,
2047 )
2049 schema = schema or self.meta.schema
2050 tbl = Table(table_name, self.meta, autoload_with=self.con, schema=schema)
2051 for column in tbl.columns:
2052 if isinstance(column.type, Numeric):
2053 column.type.asdecimal = False
2054 return tbl
2056 def drop_table(self, table_name: str, schema: str | None = None) -> None:
2057 schema = schema or self.meta.schema
2058 if self.has_table(table_name, schema):
2059 self.meta.reflect(
2060 bind=self.con, only=[table_name], schema=schema, views=True
2061 )
2062 with self.run_transaction():
2063 self.get_table(table_name, schema).drop(bind=self.con)
2064 self.meta.clear()
2066 def _create_sql_schema(
2067 self,
2068 frame: DataFrame,
2069 table_name: str,
2070 keys: list[str] | None = None,
2071 dtype: DtypeArg | None = None,
2072 schema: str | None = None,
2073 ) -> str:
2074 table = SQLTable(
2075 table_name,
2076 self,
2077 frame=frame,
2078 index=False,
2079 keys=keys,
2080 dtype=dtype,
2081 schema=schema,
2082 )
2083 return str(table.sql_schema())
2086# ---- SQL without SQLAlchemy ---
2089class ADBCDatabase(PandasSQL):
2090 """
2091 This class enables conversion between DataFrame and SQL databases
2092 using ADBC to handle DataBase abstraction.
2094 Parameters
2095 ----------
2096 con : adbc_driver_manager.dbapi.Connection
2097 """
2099 def __init__(self, con) -> None:
2100 self.con = con
2102 @contextmanager
2103 def run_transaction(self):
2104 with self.con.cursor() as cur:
2105 try:
2106 yield cur
2107 except Exception:
2108 self.con.rollback()
2109 raise
2110 self.con.commit()
2112 def execute(self, sql: str | Select | TextClause, params=None):
2113 if not isinstance(sql, str):
2114 raise TypeError("Query must be a string unless using sqlalchemy.")
2115 args = [] if params is None else [params]
2116 cur = self.con.cursor()
2117 try:
2118 cur.execute(sql, *args)
2119 return cur
2120 except Exception as exc:
2121 try:
2122 self.con.rollback()
2123 except Exception as inner_exc: # pragma: no cover
2124 ex = DatabaseError(
2125 f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
2126 )
2127 raise ex from inner_exc
2129 ex = DatabaseError(f"Execution failed on sql '{sql}': {exc}")
2130 raise ex from exc
2132 def read_table(
2133 self,
2134 table_name: str,
2135 index_col: str | list[str] | None = None,
2136 coerce_float: bool = True,
2137 parse_dates=None,
2138 columns=None,
2139 schema: str | None = None,
2140 chunksize: int | None = None,
2141 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
2142 ) -> DataFrame | Iterator[DataFrame]:
2143 """
2144 Read SQL database table into a DataFrame.
2146 Parameters
2147 ----------
2148 table_name : str
2149 Name of SQL table in database.
2150 coerce_float : bool, default True
2151 Raises NotImplementedError
2152 parse_dates : list or dict, default: None
2153 - List of column names to parse as dates.
2154 - Dict of ``{column_name: format string}`` where format string is
2155 strftime compatible in case of parsing string times, or is one of
2156 (D, s, ns, ms, us) in case of parsing integer timestamps.
2157 - Dict of ``{column_name: arg}``, where the arg corresponds
2158 to the keyword arguments of :func:`pandas.to_datetime`.
2159 Especially useful with databases without native Datetime support,
2160 such as SQLite.
2161 columns : list, default: None
2162 List of column names to select from SQL table.
2163 schema : string, default None
2164 Name of SQL schema in database to query (if database flavor
2165 supports this). If specified, this overwrites the default
2166 schema of the SQL database object.
2167 chunksize : int, default None
2168 Raises NotImplementedError
2169 dtype_backend : {'numpy_nullable', 'pyarrow'}, default 'numpy_nullable'
2170 Back-end data type applied to the resultant :class:`DataFrame`
2171 (still experimental). Behaviour is as follows:
2173 * ``"numpy_nullable"``: returns nullable-dtype-backed :class:`DataFrame`
2174 (default).
2175 * ``"pyarrow"``: returns pyarrow-backed nullable :class:`ArrowDtype`
2176 DataFrame.
2178 .. versionadded:: 2.0
2180 Returns
2181 -------
2182 DataFrame
2184 See Also
2185 --------
2186 pandas.read_sql_table
2187 SQLDatabase.read_query
2189 """
2190 if coerce_float is not True:
2191 raise NotImplementedError(
2192 "'coerce_float' is not implemented for ADBC drivers"
2193 )
2194 if chunksize:
2195 raise NotImplementedError("'chunksize' is not implemented for ADBC drivers")
2197 if columns:
2198 if index_col:
2199 index_select = maybe_make_list(index_col)
2200 else:
2201 index_select = []
2202 to_select = index_select + columns
2203 select_list = ", ".join(f'"{x}"' for x in to_select)
2204 else:
2205 select_list = "*"
2206 if schema:
2207 stmt = f"SELECT {select_list} FROM {schema}.{table_name}"
2208 else:
2209 stmt = f"SELECT {select_list} FROM {table_name}"
2211 mapping: type[ArrowDtype] | None | Callable
2212 if dtype_backend == "pyarrow":
2213 mapping = ArrowDtype
2214 elif dtype_backend == "numpy_nullable":
2215 from pandas.io._util import _arrow_dtype_mapping
2217 mapping = _arrow_dtype_mapping().get
2218 elif using_pyarrow_string_dtype():
2219 from pandas.io._util import arrow_string_types_mapper
2221 arrow_string_types_mapper()
2222 else:
2223 mapping = None
2225 with self.con.cursor() as cur:
2226 cur.execute(stmt)
2227 df = cur.fetch_arrow_table().to_pandas(types_mapper=mapping)
2229 return _wrap_result_adbc(
2230 df,
2231 index_col=index_col,
2232 parse_dates=parse_dates,
2233 )
2235 def read_query(
2236 self,
2237 sql: str,
2238 index_col: str | list[str] | None = None,
2239 coerce_float: bool = True,
2240 parse_dates=None,
2241 params=None,
2242 chunksize: int | None = None,
2243 dtype: DtypeArg | None = None,
2244 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
2245 ) -> DataFrame | Iterator[DataFrame]:
2246 """
2247 Read SQL query into a DataFrame.
2249 Parameters
2250 ----------
2251 sql : str
2252 SQL query to be executed.
2253 index_col : string, optional, default: None
2254 Column name to use as index for the returned DataFrame object.
2255 coerce_float : bool, default True
2256 Raises NotImplementedError
2257 params : list, tuple or dict, optional, default: None
2258 Raises NotImplementedError
2259 parse_dates : list or dict, default: None
2260 - List of column names to parse as dates.
2261 - Dict of ``{column_name: format string}`` where format string is
2262 strftime compatible in case of parsing string times, or is one of
2263 (D, s, ns, ms, us) in case of parsing integer timestamps.
2264 - Dict of ``{column_name: arg dict}``, where the arg dict
2265 corresponds to the keyword arguments of
2266 :func:`pandas.to_datetime` Especially useful with databases
2267 without native Datetime support, such as SQLite.
2268 chunksize : int, default None
2269 Raises NotImplementedError
2270 dtype : Type name or dict of columns
2271 Data type for data or columns. E.g. np.float64 or
2272 {'a': np.float64, 'b': np.int32, 'c': 'Int64'}
2274 .. versionadded:: 1.3.0
2276 Returns
2277 -------
2278 DataFrame
2280 See Also
2281 --------
2282 read_sql_table : Read SQL database table into a DataFrame.
2283 read_sql
2285 """
2286 if coerce_float is not True:
2287 raise NotImplementedError(
2288 "'coerce_float' is not implemented for ADBC drivers"
2289 )
2290 if params:
2291 raise NotImplementedError("'params' is not implemented for ADBC drivers")
2292 if chunksize:
2293 raise NotImplementedError("'chunksize' is not implemented for ADBC drivers")
2295 mapping: type[ArrowDtype] | None | Callable
2296 if dtype_backend == "pyarrow":
2297 mapping = ArrowDtype
2298 elif dtype_backend == "numpy_nullable":
2299 from pandas.io._util import _arrow_dtype_mapping
2301 mapping = _arrow_dtype_mapping().get
2302 else:
2303 mapping = None
2305 with self.con.cursor() as cur:
2306 cur.execute(sql)
2307 df = cur.fetch_arrow_table().to_pandas(types_mapper=mapping)
2309 return _wrap_result_adbc(
2310 df,
2311 index_col=index_col,
2312 parse_dates=parse_dates,
2313 dtype=dtype,
2314 )
2316 read_sql = read_query
2318 def to_sql(
2319 self,
2320 frame,
2321 name: str,
2322 if_exists: Literal["fail", "replace", "append"] = "fail",
2323 index: bool = True,
2324 index_label=None,
2325 schema: str | None = None,
2326 chunksize: int | None = None,
2327 dtype: DtypeArg | None = None,
2328 method: Literal["multi"] | Callable | None = None,
2329 engine: str = "auto",
2330 **engine_kwargs,
2331 ) -> int | None:
2332 """
2333 Write records stored in a DataFrame to a SQL database.
2335 Parameters
2336 ----------
2337 frame : DataFrame
2338 name : string
2339 Name of SQL table.
2340 if_exists : {'fail', 'replace', 'append'}, default 'fail'
2341 - fail: If table exists, do nothing.
2342 - replace: If table exists, drop it, recreate it, and insert data.
2343 - append: If table exists, insert data. Create if does not exist.
2344 index : boolean, default True
2345 Write DataFrame index as a column.
2346 index_label : string or sequence, default None
2347 Raises NotImplementedError
2348 schema : string, default None
2349 Name of SQL schema in database to write to (if database flavor
2350 supports this). If specified, this overwrites the default
2351 schema of the SQLDatabase object.
2352 chunksize : int, default None
2353 Raises NotImplementedError
2354 dtype : single type or dict of column name to SQL type, default None
2355 Raises NotImplementedError
2356 method : {None', 'multi', callable}, default None
2357 Raises NotImplementedError
2358 engine : {'auto', 'sqlalchemy'}, default 'auto'
2359 Raises NotImplementedError if not set to 'auto'
2360 """
2361 if index_label:
2362 raise NotImplementedError(
2363 "'index_label' is not implemented for ADBC drivers"
2364 )
2365 if chunksize:
2366 raise NotImplementedError("'chunksize' is not implemented for ADBC drivers")
2367 if dtype:
2368 raise NotImplementedError("'dtype' is not implemented for ADBC drivers")
2369 if method:
2370 raise NotImplementedError("'method' is not implemented for ADBC drivers")
2371 if engine != "auto":
2372 raise NotImplementedError(
2373 "engine != 'auto' not implemented for ADBC drivers"
2374 )
2376 if schema:
2377 table_name = f"{schema}.{name}"
2378 else:
2379 table_name = name
2381 # pandas if_exists="append" will still create the
2382 # table if it does not exist; ADBC is more explicit with append/create
2383 # as applicable modes, so the semantics get blurred across
2384 # the libraries
2385 mode = "create"
2386 if self.has_table(name, schema):
2387 if if_exists == "fail":
2388 raise ValueError(f"Table '{table_name}' already exists.")
2389 elif if_exists == "replace":
2390 with self.con.cursor() as cur:
2391 cur.execute(f"DROP TABLE {table_name}")
2392 elif if_exists == "append":
2393 mode = "append"
2395 import pyarrow as pa
2397 try:
2398 tbl = pa.Table.from_pandas(frame, preserve_index=index)
2399 except pa.ArrowNotImplementedError as exc:
2400 raise ValueError("datatypes not supported") from exc
2402 with self.con.cursor() as cur:
2403 total_inserted = cur.adbc_ingest(
2404 table_name=name, data=tbl, mode=mode, db_schema_name=schema
2405 )
2407 self.con.commit()
2408 return total_inserted
2410 def has_table(self, name: str, schema: str | None = None) -> bool:
2411 meta = self.con.adbc_get_objects(
2412 db_schema_filter=schema, table_name_filter=name
2413 ).read_all()
2415 for catalog_schema in meta["catalog_db_schemas"].to_pylist():
2416 if not catalog_schema:
2417 continue
2418 for schema_record in catalog_schema:
2419 if not schema_record:
2420 continue
2422 for table_record in schema_record["db_schema_tables"]:
2423 if table_record["table_name"] == name:
2424 return True
2426 return False
2428 def _create_sql_schema(
2429 self,
2430 frame: DataFrame,
2431 table_name: str,
2432 keys: list[str] | None = None,
2433 dtype: DtypeArg | None = None,
2434 schema: str | None = None,
2435 ) -> str:
2436 raise NotImplementedError("not implemented for adbc")
2439# sqlite-specific sql strings and handler class
2440# dictionary used for readability purposes
2441_SQL_TYPES = {
2442 "string": "TEXT",
2443 "floating": "REAL",
2444 "integer": "INTEGER",
2445 "datetime": "TIMESTAMP",
2446 "date": "DATE",
2447 "time": "TIME",
2448 "boolean": "INTEGER",
2449}
2452def _get_unicode_name(name: object):
2453 try:
2454 uname = str(name).encode("utf-8", "strict").decode("utf-8")
2455 except UnicodeError as err:
2456 raise ValueError(f"Cannot convert identifier to UTF-8: '{name}'") from err
2457 return uname
2460def _get_valid_sqlite_name(name: object):
2461 # See https://stackoverflow.com/questions/6514274/how-do-you-escape-strings\
2462 # -for-sqlite-table-column-names-in-python
2463 # Ensure the string can be encoded as UTF-8.
2464 # Ensure the string does not include any NUL characters.
2465 # Replace all " with "".
2466 # Wrap the entire thing in double quotes.
2468 uname = _get_unicode_name(name)
2469 if not len(uname):
2470 raise ValueError("Empty table or column name specified")
2472 nul_index = uname.find("\x00")
2473 if nul_index >= 0:
2474 raise ValueError("SQLite identifier cannot contain NULs")
2475 return '"' + uname.replace('"', '""') + '"'
2478class SQLiteTable(SQLTable):
2479 """
2480 Patch the SQLTable for fallback support.
2481 Instead of a table variable just use the Create Table statement.
2482 """
2484 def __init__(self, *args, **kwargs) -> None:
2485 super().__init__(*args, **kwargs)
2487 self._register_date_adapters()
2489 def _register_date_adapters(self) -> None:
2490 # GH 8341
2491 # register an adapter callable for datetime.time object
2492 import sqlite3
2494 # this will transform time(12,34,56,789) into '12:34:56.000789'
2495 # (this is what sqlalchemy does)
2496 def _adapt_time(t) -> str:
2497 # This is faster than strftime
2498 return f"{t.hour:02d}:{t.minute:02d}:{t.second:02d}.{t.microsecond:06d}"
2500 # Also register adapters for date/datetime and co
2501 # xref https://docs.python.org/3.12/library/sqlite3.html#adapter-and-converter-recipes
2502 # Python 3.12+ doesn't auto-register adapters for us anymore
2504 adapt_date_iso = lambda val: val.isoformat()
2505 adapt_datetime_iso = lambda val: val.isoformat(" ")
2507 sqlite3.register_adapter(time, _adapt_time)
2509 sqlite3.register_adapter(date, adapt_date_iso)
2510 sqlite3.register_adapter(datetime, adapt_datetime_iso)
2512 convert_date = lambda val: date.fromisoformat(val.decode())
2513 convert_timestamp = lambda val: datetime.fromisoformat(val.decode())
2515 sqlite3.register_converter("date", convert_date)
2516 sqlite3.register_converter("timestamp", convert_timestamp)
2518 def sql_schema(self) -> str:
2519 return str(";\n".join(self.table))
2521 def _execute_create(self) -> None:
2522 with self.pd_sql.run_transaction() as conn:
2523 for stmt in self.table:
2524 conn.execute(stmt)
2526 def insert_statement(self, *, num_rows: int) -> str:
2527 names = list(map(str, self.frame.columns))
2528 wld = "?" # wildcard char
2529 escape = _get_valid_sqlite_name
2531 if self.index is not None:
2532 for idx in self.index[::-1]:
2533 names.insert(0, idx)
2535 bracketed_names = [escape(column) for column in names]
2536 col_names = ",".join(bracketed_names)
2538 row_wildcards = ",".join([wld] * len(names))
2539 wildcards = ",".join([f"({row_wildcards})" for _ in range(num_rows)])
2540 insert_statement = (
2541 f"INSERT INTO {escape(self.name)} ({col_names}) VALUES {wildcards}"
2542 )
2543 return insert_statement
2545 def _execute_insert(self, conn, keys, data_iter) -> int:
2546 data_list = list(data_iter)
2547 conn.executemany(self.insert_statement(num_rows=1), data_list)
2548 return conn.rowcount
2550 def _execute_insert_multi(self, conn, keys, data_iter) -> int:
2551 data_list = list(data_iter)
2552 flattened_data = [x for row in data_list for x in row]
2553 conn.execute(self.insert_statement(num_rows=len(data_list)), flattened_data)
2554 return conn.rowcount
2556 def _create_table_setup(self):
2557 """
2558 Return a list of SQL statements that creates a table reflecting the
2559 structure of a DataFrame. The first entry will be a CREATE TABLE
2560 statement while the rest will be CREATE INDEX statements.
2561 """
2562 column_names_and_types = self._get_column_names_and_types(self._sql_type_name)
2563 escape = _get_valid_sqlite_name
2565 create_tbl_stmts = [
2566 escape(cname) + " " + ctype for cname, ctype, _ in column_names_and_types
2567 ]
2569 if self.keys is not None and len(self.keys):
2570 if not is_list_like(self.keys):
2571 keys = [self.keys]
2572 else:
2573 keys = self.keys
2574 cnames_br = ", ".join([escape(c) for c in keys])
2575 create_tbl_stmts.append(
2576 f"CONSTRAINT {self.name}_pk PRIMARY KEY ({cnames_br})"
2577 )
2578 if self.schema:
2579 schema_name = self.schema + "."
2580 else:
2581 schema_name = ""
2582 create_stmts = [
2583 "CREATE TABLE "
2584 + schema_name
2585 + escape(self.name)
2586 + " (\n"
2587 + ",\n ".join(create_tbl_stmts)
2588 + "\n)"
2589 ]
2591 ix_cols = [cname for cname, _, is_index in column_names_and_types if is_index]
2592 if len(ix_cols):
2593 cnames = "_".join(ix_cols)
2594 cnames_br = ",".join([escape(c) for c in ix_cols])
2595 create_stmts.append(
2596 "CREATE INDEX "
2597 + escape("ix_" + self.name + "_" + cnames)
2598 + "ON "
2599 + escape(self.name)
2600 + " ("
2601 + cnames_br
2602 + ")"
2603 )
2605 return create_stmts
2607 def _sql_type_name(self, col):
2608 dtype: DtypeArg = self.dtype or {}
2609 if is_dict_like(dtype):
2610 dtype = cast(dict, dtype)
2611 if col.name in dtype:
2612 return dtype[col.name]
2614 # Infer type of column, while ignoring missing values.
2615 # Needed for inserting typed data containing NULLs, GH 8778.
2616 col_type = lib.infer_dtype(col, skipna=True)
2618 if col_type == "timedelta64":
2619 warnings.warn(
2620 "the 'timedelta' type is not supported, and will be "
2621 "written as integer values (ns frequency) to the database.",
2622 UserWarning,
2623 stacklevel=find_stack_level(),
2624 )
2625 col_type = "integer"
2627 elif col_type == "datetime64":
2628 col_type = "datetime"
2630 elif col_type == "empty":
2631 col_type = "string"
2633 elif col_type == "complex":
2634 raise ValueError("Complex datatypes not supported")
2636 if col_type not in _SQL_TYPES:
2637 col_type = "string"
2639 return _SQL_TYPES[col_type]
2642class SQLiteDatabase(PandasSQL):
2643 """
2644 Version of SQLDatabase to support SQLite connections (fallback without
2645 SQLAlchemy). This should only be used internally.
2647 Parameters
2648 ----------
2649 con : sqlite connection object
2651 """
2653 def __init__(self, con) -> None:
2654 self.con = con
2656 @contextmanager
2657 def run_transaction(self):
2658 cur = self.con.cursor()
2659 try:
2660 yield cur
2661 self.con.commit()
2662 except Exception:
2663 self.con.rollback()
2664 raise
2665 finally:
2666 cur.close()
2668 def execute(self, sql: str | Select | TextClause, params=None):
2669 if not isinstance(sql, str):
2670 raise TypeError("Query must be a string unless using sqlalchemy.")
2671 args = [] if params is None else [params]
2672 cur = self.con.cursor()
2673 try:
2674 cur.execute(sql, *args)
2675 return cur
2676 except Exception as exc:
2677 try:
2678 self.con.rollback()
2679 except Exception as inner_exc: # pragma: no cover
2680 ex = DatabaseError(
2681 f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
2682 )
2683 raise ex from inner_exc
2685 ex = DatabaseError(f"Execution failed on sql '{sql}': {exc}")
2686 raise ex from exc
2688 @staticmethod
2689 def _query_iterator(
2690 cursor,
2691 chunksize: int,
2692 columns,
2693 index_col=None,
2694 coerce_float: bool = True,
2695 parse_dates=None,
2696 dtype: DtypeArg | None = None,
2697 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
2698 ):
2699 """Return generator through chunked result set"""
2700 has_read_data = False
2701 while True:
2702 data = cursor.fetchmany(chunksize)
2703 if type(data) == tuple:
2704 data = list(data)
2705 if not data:
2706 cursor.close()
2707 if not has_read_data:
2708 result = DataFrame.from_records(
2709 [], columns=columns, coerce_float=coerce_float
2710 )
2711 if dtype:
2712 result = result.astype(dtype)
2713 yield result
2714 break
2716 has_read_data = True
2717 yield _wrap_result(
2718 data,
2719 columns,
2720 index_col=index_col,
2721 coerce_float=coerce_float,
2722 parse_dates=parse_dates,
2723 dtype=dtype,
2724 dtype_backend=dtype_backend,
2725 )
2727 def read_query(
2728 self,
2729 sql,
2730 index_col=None,
2731 coerce_float: bool = True,
2732 parse_dates=None,
2733 params=None,
2734 chunksize: int | None = None,
2735 dtype: DtypeArg | None = None,
2736 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
2737 ) -> DataFrame | Iterator[DataFrame]:
2738 cursor = self.execute(sql, params)
2739 columns = [col_desc[0] for col_desc in cursor.description]
2741 if chunksize is not None:
2742 return self._query_iterator(
2743 cursor,
2744 chunksize,
2745 columns,
2746 index_col=index_col,
2747 coerce_float=coerce_float,
2748 parse_dates=parse_dates,
2749 dtype=dtype,
2750 dtype_backend=dtype_backend,
2751 )
2752 else:
2753 data = self._fetchall_as_list(cursor)
2754 cursor.close()
2756 frame = _wrap_result(
2757 data,
2758 columns,
2759 index_col=index_col,
2760 coerce_float=coerce_float,
2761 parse_dates=parse_dates,
2762 dtype=dtype,
2763 dtype_backend=dtype_backend,
2764 )
2765 return frame
2767 def _fetchall_as_list(self, cur):
2768 result = cur.fetchall()
2769 if not isinstance(result, list):
2770 result = list(result)
2771 return result
2773 def to_sql(
2774 self,
2775 frame,
2776 name: str,
2777 if_exists: str = "fail",
2778 index: bool = True,
2779 index_label=None,
2780 schema=None,
2781 chunksize: int | None = None,
2782 dtype: DtypeArg | None = None,
2783 method: Literal["multi"] | Callable | None = None,
2784 engine: str = "auto",
2785 **engine_kwargs,
2786 ) -> int | None:
2787 """
2788 Write records stored in a DataFrame to a SQL database.
2790 Parameters
2791 ----------
2792 frame: DataFrame
2793 name: string
2794 Name of SQL table.
2795 if_exists: {'fail', 'replace', 'append'}, default 'fail'
2796 fail: If table exists, do nothing.
2797 replace: If table exists, drop it, recreate it, and insert data.
2798 append: If table exists, insert data. Create if it does not exist.
2799 index : bool, default True
2800 Write DataFrame index as a column
2801 index_label : string or sequence, default None
2802 Column label for index column(s). If None is given (default) and
2803 `index` is True, then the index names are used.
2804 A sequence should be given if the DataFrame uses MultiIndex.
2805 schema : string, default None
2806 Ignored parameter included for compatibility with SQLAlchemy
2807 version of ``to_sql``.
2808 chunksize : int, default None
2809 If not None, then rows will be written in batches of this
2810 size at a time. If None, all rows will be written at once.
2811 dtype : single type or dict of column name to SQL type, default None
2812 Optional specifying the datatype for columns. The SQL type should
2813 be a string. If all columns are of the same type, one single value
2814 can be used.
2815 method : {None, 'multi', callable}, default None
2816 Controls the SQL insertion clause used:
2818 * None : Uses standard SQL ``INSERT`` clause (one per row).
2819 * 'multi': Pass multiple values in a single ``INSERT`` clause.
2820 * callable with signature ``(pd_table, conn, keys, data_iter)``.
2822 Details and a sample callable implementation can be found in the
2823 section :ref:`insert method <io.sql.method>`.
2824 """
2825 if dtype:
2826 if not is_dict_like(dtype):
2827 # error: Value expression in dictionary comprehension has incompatible
2828 # type "Union[ExtensionDtype, str, dtype[Any], Type[object],
2829 # Dict[Hashable, Union[ExtensionDtype, Union[str, dtype[Any]],
2830 # Type[str], Type[float], Type[int], Type[complex], Type[bool],
2831 # Type[object]]]]"; expected type "Union[ExtensionDtype, str,
2832 # dtype[Any], Type[object]]"
2833 dtype = {col_name: dtype for col_name in frame} # type: ignore[misc]
2834 else:
2835 dtype = cast(dict, dtype)
2837 for col, my_type in dtype.items():
2838 if not isinstance(my_type, str):
2839 raise ValueError(f"{col} ({my_type}) not a string")
2841 table = SQLiteTable(
2842 name,
2843 self,
2844 frame=frame,
2845 index=index,
2846 if_exists=if_exists,
2847 index_label=index_label,
2848 dtype=dtype,
2849 )
2850 table.create()
2851 return table.insert(chunksize, method)
2853 def has_table(self, name: str, schema: str | None = None) -> bool:
2854 wld = "?"
2855 query = f"""
2856 SELECT
2857 name
2858 FROM
2859 sqlite_master
2860 WHERE
2861 type IN ('table', 'view')
2862 AND name={wld};
2863 """
2865 return len(self.execute(query, [name]).fetchall()) > 0
2867 def get_table(self, table_name: str, schema: str | None = None) -> None:
2868 return None # not supported in fallback mode
2870 def drop_table(self, name: str, schema: str | None = None) -> None:
2871 drop_sql = f"DROP TABLE {_get_valid_sqlite_name(name)}"
2872 self.execute(drop_sql)
2874 def _create_sql_schema(
2875 self,
2876 frame,
2877 table_name: str,
2878 keys=None,
2879 dtype: DtypeArg | None = None,
2880 schema: str | None = None,
2881 ) -> str:
2882 table = SQLiteTable(
2883 table_name,
2884 self,
2885 frame=frame,
2886 index=False,
2887 keys=keys,
2888 dtype=dtype,
2889 schema=schema,
2890 )
2891 return str(table.sql_schema())
2894def get_schema(
2895 frame,
2896 name: str,
2897 keys=None,
2898 con=None,
2899 dtype: DtypeArg | None = None,
2900 schema: str | None = None,
2901) -> str:
2902 """
2903 Get the SQL db table schema for the given frame.
2905 Parameters
2906 ----------
2907 frame : DataFrame
2908 name : str
2909 name of SQL table
2910 keys : string or sequence, default: None
2911 columns to use a primary key
2912 con: ADBC Connection, SQLAlchemy connectable, sqlite3 connection, default: None
2913 ADBC provides high performance I/O with native type support, where available.
2914 Using SQLAlchemy makes it possible to use any DB supported by that
2915 library
2916 If a DBAPI2 object, only sqlite3 is supported.
2917 dtype : dict of column name to SQL type, default None
2918 Optional specifying the datatype for columns. The SQL type should
2919 be a SQLAlchemy type, or a string for sqlite3 fallback connection.
2920 schema: str, default: None
2921 Optional specifying the schema to be used in creating the table.
2922 """
2923 with pandasSQL_builder(con=con) as pandas_sql:
2924 return pandas_sql._create_sql_schema(
2925 frame, name, keys=keys, dtype=dtype, schema=schema
2926 )