Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pandas/io/sql.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Collection of query wrappers / abstractions to both facilitate data
3retrieval and to reduce dependency on DB-specific API.
4"""
6from __future__ import annotations
8from abc import (
9 ABC,
10 abstractmethod,
11)
12from contextlib import (
13 ExitStack,
14 contextmanager,
15)
16from datetime import (
17 date,
18 datetime,
19 time,
20)
21from functools import partial
22import re
23from typing import (
24 TYPE_CHECKING,
25 Any,
26 Iterator,
27 Literal,
28 cast,
29 overload,
30)
31import warnings
33import numpy as np
35from pandas._libs import lib
36from pandas._typing import (
37 DateTimeErrorChoices,
38 DtypeArg,
39 DtypeBackend,
40 IndexLabel,
41)
42from pandas.compat._optional import import_optional_dependency
43from pandas.errors import (
44 AbstractMethodError,
45 DatabaseError,
46)
47from pandas.util._exceptions import find_stack_level
48from pandas.util._validators import check_dtype_backend
50from pandas.core.dtypes.common import (
51 is_datetime64tz_dtype,
52 is_dict_like,
53 is_integer,
54 is_list_like,
55)
56from pandas.core.dtypes.dtypes import DatetimeTZDtype
57from pandas.core.dtypes.missing import isna
59from pandas import get_option
60from pandas.core.api import (
61 DataFrame,
62 Series,
63)
64from pandas.core.arrays import ArrowExtensionArray
65from pandas.core.base import PandasObject
66import pandas.core.common as com
67from pandas.core.internals.construction import convert_object_array
68from pandas.core.tools.datetimes import to_datetime
70if TYPE_CHECKING:
71 from sqlalchemy import Table
72 from sqlalchemy.sql.expression import (
73 Select,
74 TextClause,
75 )
78# -----------------------------------------------------------------------------
79# -- Helper functions
82def _process_parse_dates_argument(parse_dates):
83 """Process parse_dates argument for read_sql functions"""
84 # handle non-list entries for parse_dates gracefully
85 if parse_dates is True or parse_dates is None or parse_dates is False:
86 parse_dates = []
88 elif not hasattr(parse_dates, "__iter__"):
89 parse_dates = [parse_dates]
90 return parse_dates
93def _handle_date_column(
94 col, utc: bool = False, format: str | dict[str, Any] | None = None
95):
96 if isinstance(format, dict):
97 # GH35185 Allow custom error values in parse_dates argument of
98 # read_sql like functions.
99 # Format can take on custom to_datetime argument values such as
100 # {"errors": "coerce"} or {"dayfirst": True}
101 error: DateTimeErrorChoices = format.pop("errors", None) or "ignore"
102 return to_datetime(col, errors=error, **format)
103 else:
104 # Allow passing of formatting string for integers
105 # GH17855
106 if format is None and (
107 issubclass(col.dtype.type, np.floating)
108 or issubclass(col.dtype.type, np.integer)
109 ):
110 format = "s"
111 if format in ["D", "d", "h", "m", "s", "ms", "us", "ns"]:
112 return to_datetime(col, errors="coerce", unit=format, utc=utc)
113 elif is_datetime64tz_dtype(col.dtype):
114 # coerce to UTC timezone
115 # GH11216
116 return to_datetime(col, utc=True)
117 else:
118 return to_datetime(col, errors="coerce", format=format, utc=utc)
121def _parse_date_columns(data_frame, parse_dates):
122 """
123 Force non-datetime columns to be read as such.
124 Supports both string formatted and integer timestamp columns.
125 """
126 parse_dates = _process_parse_dates_argument(parse_dates)
128 # we want to coerce datetime64_tz dtypes for now to UTC
129 # we could in theory do a 'nice' conversion from a FixedOffset tz
130 # GH11216
131 for col_name, df_col in data_frame.items():
132 if is_datetime64tz_dtype(df_col.dtype) or col_name in parse_dates:
133 try:
134 fmt = parse_dates[col_name]
135 except TypeError:
136 fmt = None
137 data_frame[col_name] = _handle_date_column(df_col, format=fmt)
139 return data_frame
142def _convert_arrays_to_dataframe(
143 data,
144 columns,
145 coerce_float: bool = True,
146 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
147) -> DataFrame:
148 content = lib.to_object_array_tuples(data)
149 arrays = convert_object_array(
150 list(content.T),
151 dtype=None,
152 coerce_float=coerce_float,
153 dtype_backend=dtype_backend,
154 )
155 if dtype_backend == "pyarrow":
156 pa = import_optional_dependency("pyarrow")
157 arrays = [
158 ArrowExtensionArray(pa.array(arr, from_pandas=True)) for arr in arrays
159 ]
160 if arrays:
161 df = DataFrame(dict(zip(list(range(len(columns))), arrays)))
162 df.columns = columns
163 return df
164 else:
165 return DataFrame(columns=columns)
168def _wrap_result(
169 data,
170 columns,
171 index_col=None,
172 coerce_float: bool = True,
173 parse_dates=None,
174 dtype: DtypeArg | None = None,
175 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
176):
177 """Wrap result set of query in a DataFrame."""
178 frame = _convert_arrays_to_dataframe(data, columns, coerce_float, dtype_backend)
180 if dtype:
181 frame = frame.astype(dtype)
183 frame = _parse_date_columns(frame, parse_dates)
185 if index_col is not None:
186 frame = frame.set_index(index_col)
188 return frame
191def execute(sql, con, params=None):
192 """
193 Execute the given SQL query using the provided connection object.
195 Parameters
196 ----------
197 sql : string
198 SQL query to be executed.
199 con : SQLAlchemy connection or sqlite3 connection
200 If a DBAPI2 object, only sqlite3 is supported.
201 params : list or tuple, optional, default: None
202 List of parameters to pass to execute method.
204 Returns
205 -------
206 Results Iterable
207 """
208 warnings.warn(
209 "`pandas.io.sql.execute` is deprecated and "
210 "will be removed in the future version.",
211 FutureWarning,
212 stacklevel=find_stack_level(),
213 ) # GH50185
214 sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore")
216 if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Engine)):
217 raise TypeError("pandas.io.sql.execute requires a connection") # GH50185
218 with pandasSQL_builder(con, need_transaction=True) as pandas_sql:
219 return pandas_sql.execute(sql, params)
222# -----------------------------------------------------------------------------
223# -- Read and write to DataFrames
226@overload
227def read_sql_table(
228 table_name,
229 con,
230 schema=...,
231 index_col: str | list[str] | None = ...,
232 coerce_float=...,
233 parse_dates: list[str] | dict[str, str] | None = ...,
234 columns: list[str] | None = ...,
235 chunksize: None = ...,
236 dtype_backend: DtypeBackend | lib.NoDefault = ...,
237) -> DataFrame:
238 ...
241@overload
242def read_sql_table(
243 table_name,
244 con,
245 schema=...,
246 index_col: str | list[str] | None = ...,
247 coerce_float=...,
248 parse_dates: list[str] | dict[str, str] | None = ...,
249 columns: list[str] | None = ...,
250 chunksize: int = ...,
251 dtype_backend: DtypeBackend | lib.NoDefault = ...,
252) -> Iterator[DataFrame]:
253 ...
256def read_sql_table(
257 table_name: str,
258 con,
259 schema: str | None = None,
260 index_col: str | list[str] | None = None,
261 coerce_float: bool = True,
262 parse_dates: list[str] | dict[str, str] | None = None,
263 columns: list[str] | None = None,
264 chunksize: int | None = None,
265 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
266) -> DataFrame | Iterator[DataFrame]:
267 """
268 Read SQL database table into a DataFrame.
270 Given a table name and a SQLAlchemy connectable, returns a DataFrame.
271 This function does not support DBAPI connections.
273 Parameters
274 ----------
275 table_name : str
276 Name of SQL table in database.
277 con : SQLAlchemy connectable or str
278 A database URI could be provided as str.
279 SQLite DBAPI connection mode not supported.
280 schema : str, default None
281 Name of SQL schema in database to query (if database flavor
282 supports this). Uses default schema if None (default).
283 index_col : str or list of str, optional, default: None
284 Column(s) to set as index(MultiIndex).
285 coerce_float : bool, default True
286 Attempts to convert values of non-string, non-numeric objects (like
287 decimal.Decimal) to floating point. Can result in loss of Precision.
288 parse_dates : list or dict, default None
289 - List of column names to parse as dates.
290 - Dict of ``{column_name: format string}`` where format string is
291 strftime compatible in case of parsing string times or is one of
292 (D, s, ns, ms, us) in case of parsing integer timestamps.
293 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
294 to the keyword arguments of :func:`pandas.to_datetime`
295 Especially useful with databases without native Datetime support,
296 such as SQLite.
297 columns : list, default None
298 List of column names to select from SQL table.
299 chunksize : int, default None
300 If specified, returns an iterator where `chunksize` is the number of
301 rows to include in each chunk.
302 dtype_backend : {"numpy_nullable", "pyarrow"}, defaults to NumPy backed DataFrames
303 Which dtype_backend to use, e.g. whether a DataFrame should have NumPy
304 arrays, nullable dtypes are used for all dtypes that have a nullable
305 implementation when "numpy_nullable" is set, pyarrow is used for all
306 dtypes if "pyarrow" is set.
308 The dtype_backends are still experimential.
310 .. versionadded:: 2.0
312 Returns
313 -------
314 DataFrame or Iterator[DataFrame]
315 A SQL table is returned as two-dimensional data structure with labeled
316 axes.
318 See Also
319 --------
320 read_sql_query : Read SQL query into a DataFrame.
321 read_sql : Read SQL query or database table into a DataFrame.
323 Notes
324 -----
325 Any datetime values with time zone information will be converted to UTC.
327 Examples
328 --------
329 >>> pd.read_sql_table('table_name', 'postgres:///db_name') # doctest:+SKIP
330 """
332 check_dtype_backend(dtype_backend)
333 if dtype_backend is lib.no_default:
334 dtype_backend = "numpy" # type: ignore[assignment]
336 with pandasSQL_builder(con, schema=schema, need_transaction=True) as pandas_sql:
337 if not pandas_sql.has_table(table_name):
338 raise ValueError(f"Table {table_name} not found")
340 table = pandas_sql.read_table(
341 table_name,
342 index_col=index_col,
343 coerce_float=coerce_float,
344 parse_dates=parse_dates,
345 columns=columns,
346 chunksize=chunksize,
347 dtype_backend=dtype_backend,
348 )
350 if table is not None:
351 return table
352 else:
353 raise ValueError(f"Table {table_name} not found", con)
356@overload
357def read_sql_query(
358 sql,
359 con,
360 index_col: str | list[str] | None = ...,
361 coerce_float=...,
362 params: list[str] | dict[str, str] | None = ...,
363 parse_dates: list[str] | dict[str, str] | None = ...,
364 chunksize: None = ...,
365 dtype: DtypeArg | None = ...,
366 dtype_backend: DtypeBackend | lib.NoDefault = ...,
367) -> DataFrame:
368 ...
371@overload
372def read_sql_query(
373 sql,
374 con,
375 index_col: str | list[str] | None = ...,
376 coerce_float=...,
377 params: list[str] | dict[str, str] | None = ...,
378 parse_dates: list[str] | dict[str, str] | None = ...,
379 chunksize: int = ...,
380 dtype: DtypeArg | None = ...,
381 dtype_backend: DtypeBackend | lib.NoDefault = ...,
382) -> Iterator[DataFrame]:
383 ...
386def read_sql_query(
387 sql,
388 con,
389 index_col: str | list[str] | None = None,
390 coerce_float: bool = True,
391 params: list[str] | dict[str, str] | None = None,
392 parse_dates: list[str] | dict[str, str] | None = None,
393 chunksize: int | None = None,
394 dtype: DtypeArg | None = None,
395 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
396) -> DataFrame | Iterator[DataFrame]:
397 """
398 Read SQL query into a DataFrame.
400 Returns a DataFrame corresponding to the result set of the query
401 string. Optionally provide an `index_col` parameter to use one of the
402 columns as the index, otherwise default integer index will be used.
404 Parameters
405 ----------
406 sql : str SQL query or SQLAlchemy Selectable (select or text object)
407 SQL query to be executed.
408 con : SQLAlchemy connectable, str, or sqlite3 connection
409 Using SQLAlchemy makes it possible to use any DB supported by that
410 library. If a DBAPI2 object, only sqlite3 is supported.
411 index_col : str or list of str, optional, default: None
412 Column(s) to set as index(MultiIndex).
413 coerce_float : bool, default True
414 Attempts to convert values of non-string, non-numeric objects (like
415 decimal.Decimal) to floating point. Useful for SQL result sets.
416 params : list, tuple or dict, optional, default: None
417 List of parameters to pass to execute method. The syntax used
418 to pass parameters is database driver dependent. Check your
419 database driver documentation for which of the five syntax styles,
420 described in PEP 249's paramstyle, is supported.
421 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}.
422 parse_dates : list or dict, default: None
423 - List of column names to parse as dates.
424 - Dict of ``{column_name: format string}`` where format string is
425 strftime compatible in case of parsing string times, or is one of
426 (D, s, ns, ms, us) in case of parsing integer timestamps.
427 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
428 to the keyword arguments of :func:`pandas.to_datetime`
429 Especially useful with databases without native Datetime support,
430 such as SQLite.
431 chunksize : int, default None
432 If specified, return an iterator where `chunksize` is the number of
433 rows to include in each chunk.
434 dtype : Type name or dict of columns
435 Data type for data or columns. E.g. np.float64 or
436 {‘a’: np.float64, ‘b’: np.int32, ‘c’: ‘Int64’}.
438 .. versionadded:: 1.3.0
439 dtype_backend : {"numpy_nullable", "pyarrow"}, defaults to NumPy backed DataFrames
440 Which dtype_backend to use, e.g. whether a DataFrame should have NumPy
441 arrays, nullable dtypes are used for all dtypes that have a nullable
442 implementation when "numpy_nullable" is set, pyarrow is used for all
443 dtypes if "pyarrow" is set.
445 The dtype_backends are still experimential.
447 .. versionadded:: 2.0
449 Returns
450 -------
451 DataFrame or Iterator[DataFrame]
453 See Also
454 --------
455 read_sql_table : Read SQL database table into a DataFrame.
456 read_sql : Read SQL query or database table into a DataFrame.
458 Notes
459 -----
460 Any datetime values with time zone information parsed via the `parse_dates`
461 parameter will be converted to UTC.
462 """
464 check_dtype_backend(dtype_backend)
465 if dtype_backend is lib.no_default:
466 dtype_backend = "numpy" # type: ignore[assignment]
468 with pandasSQL_builder(con) as pandas_sql:
469 return pandas_sql.read_query(
470 sql,
471 index_col=index_col,
472 params=params,
473 coerce_float=coerce_float,
474 parse_dates=parse_dates,
475 chunksize=chunksize,
476 dtype=dtype,
477 dtype_backend=dtype_backend,
478 )
481@overload
482def read_sql(
483 sql,
484 con,
485 index_col: str | list[str] | None = ...,
486 coerce_float=...,
487 params=...,
488 parse_dates=...,
489 columns: list[str] = ...,
490 chunksize: None = ...,
491 dtype_backend: DtypeBackend | lib.NoDefault = ...,
492 dtype: DtypeArg | None = None,
493) -> DataFrame:
494 ...
497@overload
498def read_sql(
499 sql,
500 con,
501 index_col: str | list[str] | None = ...,
502 coerce_float=...,
503 params=...,
504 parse_dates=...,
505 columns: list[str] = ...,
506 chunksize: int = ...,
507 dtype_backend: DtypeBackend | lib.NoDefault = ...,
508 dtype: DtypeArg | None = None,
509) -> Iterator[DataFrame]:
510 ...
513def read_sql(
514 sql,
515 con,
516 index_col: str | list[str] | None = None,
517 coerce_float: bool = True,
518 params=None,
519 parse_dates=None,
520 columns: list[str] | None = None,
521 chunksize: int | None = None,
522 dtype_backend: DtypeBackend | lib.NoDefault = lib.no_default,
523 dtype: DtypeArg | None = None,
524) -> DataFrame | Iterator[DataFrame]:
525 """
526 Read SQL query or database table into a DataFrame.
528 This function is a convenience wrapper around ``read_sql_table`` and
529 ``read_sql_query`` (for backward compatibility). It will delegate
530 to the specific function depending on the provided input. A SQL query
531 will be routed to ``read_sql_query``, while a database table name will
532 be routed to ``read_sql_table``. Note that the delegated function might
533 have more specific notes about their functionality not listed here.
535 Parameters
536 ----------
537 sql : str or SQLAlchemy Selectable (select or text object)
538 SQL query to be executed or a table name.
539 con : SQLAlchemy connectable, str, or sqlite3 connection
540 Using SQLAlchemy makes it possible to use any DB supported by that
541 library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible
542 for engine disposal and connection closure for the SQLAlchemy connectable; str
543 connections are closed automatically. See
544 `here <https://docs.sqlalchemy.org/en/13/core/connections.html>`_.
545 index_col : str or list of str, optional, default: None
546 Column(s) to set as index(MultiIndex).
547 coerce_float : bool, default True
548 Attempts to convert values of non-string, non-numeric objects (like
549 decimal.Decimal) to floating point, useful for SQL result sets.
550 params : list, tuple or dict, optional, default: None
551 List of parameters to pass to execute method. The syntax used
552 to pass parameters is database driver dependent. Check your
553 database driver documentation for which of the five syntax styles,
554 described in PEP 249's paramstyle, is supported.
555 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}.
556 parse_dates : list or dict, default: None
557 - List of column names to parse as dates.
558 - Dict of ``{column_name: format string}`` where format string is
559 strftime compatible in case of parsing string times, or is one of
560 (D, s, ns, ms, us) in case of parsing integer timestamps.
561 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
562 to the keyword arguments of :func:`pandas.to_datetime`
563 Especially useful with databases without native Datetime support,
564 such as SQLite.
565 columns : list, default: None
566 List of column names to select from SQL table (only used when reading
567 a table).
568 chunksize : int, default None
569 If specified, return an iterator where `chunksize` is the
570 number of rows to include in each chunk.
571 dtype_backend : {"numpy_nullable", "pyarrow"}, defaults to NumPy backed DataFrames
572 Which dtype_backend to use, e.g. whether a DataFrame should have NumPy
573 arrays, nullable dtypes are used for all dtypes that have a nullable
574 implementation when "numpy_nullable" is set, pyarrow is used for all
575 dtypes if "pyarrow" is set.
577 The dtype_backends are still experimential.
579 .. versionadded:: 2.0
580 dtype : Type name or dict of columns
581 Data type for data or columns. E.g. np.float64 or
582 {‘a’: np.float64, ‘b’: np.int32, ‘c’: ‘Int64’}.
583 The argument is ignored if a table is passed instead of a query.
585 .. versionadded:: 2.0.0
587 Returns
588 -------
589 DataFrame or Iterator[DataFrame]
591 See Also
592 --------
593 read_sql_table : Read SQL database table into a DataFrame.
594 read_sql_query : Read SQL query into a DataFrame.
596 Examples
597 --------
598 Read data from SQL via either a SQL query or a SQL tablename.
599 When using a SQLite database only SQL queries are accepted,
600 providing only the SQL tablename will result in an error.
602 >>> from sqlite3 import connect
603 >>> conn = connect(':memory:')
604 >>> df = pd.DataFrame(data=[[0, '10/11/12'], [1, '12/11/10']],
605 ... columns=['int_column', 'date_column'])
606 >>> df.to_sql('test_data', conn)
607 2
609 >>> pd.read_sql('SELECT int_column, date_column FROM test_data', conn)
610 int_column date_column
611 0 0 10/11/12
612 1 1 12/11/10
614 >>> pd.read_sql('test_data', 'postgres:///db_name') # doctest:+SKIP
616 Apply date parsing to columns through the ``parse_dates`` argument
617 The ``parse_dates`` argument calls ``pd.to_datetime`` on the provided columns.
618 Custom argument values for applying ``pd.to_datetime`` on a column are specified
619 via a dictionary format:
621 >>> pd.read_sql('SELECT int_column, date_column FROM test_data',
622 ... conn,
623 ... parse_dates={"date_column": {"format": "%d/%m/%y"}})
624 int_column date_column
625 0 0 2012-11-10
626 1 1 2010-11-12
627 """
629 check_dtype_backend(dtype_backend)
630 if dtype_backend is lib.no_default:
631 dtype_backend = "numpy" # type: ignore[assignment]
633 with pandasSQL_builder(con) as pandas_sql:
634 if isinstance(pandas_sql, SQLiteDatabase):
635 return pandas_sql.read_query(
636 sql,
637 index_col=index_col,
638 params=params,
639 coerce_float=coerce_float,
640 parse_dates=parse_dates,
641 chunksize=chunksize,
642 dtype_backend=dtype_backend, # type: ignore[arg-type]
643 dtype=dtype,
644 )
646 try:
647 _is_table_name = pandas_sql.has_table(sql)
648 except Exception:
649 # using generic exception to catch errors from sql drivers (GH24988)
650 _is_table_name = False
652 if _is_table_name:
653 return pandas_sql.read_table(
654 sql,
655 index_col=index_col,
656 coerce_float=coerce_float,
657 parse_dates=parse_dates,
658 columns=columns,
659 chunksize=chunksize,
660 dtype_backend=dtype_backend,
661 )
662 else:
663 return pandas_sql.read_query(
664 sql,
665 index_col=index_col,
666 params=params,
667 coerce_float=coerce_float,
668 parse_dates=parse_dates,
669 chunksize=chunksize,
670 dtype_backend=dtype_backend,
671 dtype=dtype,
672 )
675def to_sql(
676 frame,
677 name: str,
678 con,
679 schema: str | None = None,
680 if_exists: Literal["fail", "replace", "append"] = "fail",
681 index: bool = True,
682 index_label: IndexLabel = None,
683 chunksize: int | None = None,
684 dtype: DtypeArg | None = None,
685 method: str | None = None,
686 engine: str = "auto",
687 **engine_kwargs,
688) -> int | None:
689 """
690 Write records stored in a DataFrame to a SQL database.
692 Parameters
693 ----------
694 frame : DataFrame, Series
695 name : str
696 Name of SQL table.
697 con : SQLAlchemy connectable(engine/connection) or database string URI
698 or sqlite3 DBAPI2 connection
699 Using SQLAlchemy makes it possible to use any DB supported by that
700 library.
701 If a DBAPI2 object, only sqlite3 is supported.
702 schema : str, optional
703 Name of SQL schema in database to write to (if database flavor
704 supports this). If None, use default schema (default).
705 if_exists : {'fail', 'replace', 'append'}, default 'fail'
706 - fail: If table exists, do nothing.
707 - replace: If table exists, drop it, recreate it, and insert data.
708 - append: If table exists, insert data. Create if does not exist.
709 index : bool, default True
710 Write DataFrame index as a column.
711 index_label : str or sequence, optional
712 Column label for index column(s). If None is given (default) and
713 `index` is True, then the index names are used.
714 A sequence should be given if the DataFrame uses MultiIndex.
715 chunksize : int, optional
716 Specify the number of rows in each batch to be written at a time.
717 By default, all rows will be written at once.
718 dtype : dict or scalar, optional
719 Specifying the datatype for columns. If a dictionary is used, the
720 keys should be the column names and the values should be the
721 SQLAlchemy types or strings for the sqlite3 fallback mode. If a
722 scalar is provided, it will be applied to all columns.
723 method : {None, 'multi', callable}, optional
724 Controls the SQL insertion clause used:
726 - None : Uses standard SQL ``INSERT`` clause (one per row).
727 - ``'multi'``: Pass multiple values in a single ``INSERT`` clause.
728 - callable with signature ``(pd_table, conn, keys, data_iter) -> int | None``.
730 Details and a sample callable implementation can be found in the
731 section :ref:`insert method <io.sql.method>`.
732 engine : {'auto', 'sqlalchemy'}, default 'auto'
733 SQL engine library to use. If 'auto', then the option
734 ``io.sql.engine`` is used. The default ``io.sql.engine``
735 behavior is 'sqlalchemy'
737 .. versionadded:: 1.3.0
739 **engine_kwargs
740 Any additional kwargs are passed to the engine.
742 Returns
743 -------
744 None or int
745 Number of rows affected by to_sql. None is returned if the callable
746 passed into ``method`` does not return an integer number of rows.
748 .. versionadded:: 1.4.0
750 Notes
751 -----
752 The returned rows affected is the sum of the ``rowcount`` attribute of ``sqlite3.Cursor``
753 or SQLAlchemy connectable. The returned value may not reflect the exact number of written
754 rows as stipulated in the
755 `sqlite3 <https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.rowcount>`__ or
756 `SQLAlchemy <https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount>`__
757 """ # noqa:E501
758 if if_exists not in ("fail", "replace", "append"):
759 raise ValueError(f"'{if_exists}' is not valid for if_exists")
761 if isinstance(frame, Series):
762 frame = frame.to_frame()
763 elif not isinstance(frame, DataFrame):
764 raise NotImplementedError(
765 "'frame' argument should be either a Series or a DataFrame"
766 )
768 with pandasSQL_builder(con, schema=schema, need_transaction=True) as pandas_sql:
769 return pandas_sql.to_sql(
770 frame,
771 name,
772 if_exists=if_exists,
773 index=index,
774 index_label=index_label,
775 schema=schema,
776 chunksize=chunksize,
777 dtype=dtype,
778 method=method,
779 engine=engine,
780 **engine_kwargs,
781 )
784def has_table(table_name: str, con, schema: str | None = None) -> bool:
785 """
786 Check if DataBase has named table.
788 Parameters
789 ----------
790 table_name: string
791 Name of SQL table.
792 con: SQLAlchemy connectable(engine/connection) or sqlite3 DBAPI2 connection
793 Using SQLAlchemy makes it possible to use any DB supported by that
794 library.
795 If a DBAPI2 object, only sqlite3 is supported.
796 schema : string, default None
797 Name of SQL schema in database to write to (if database flavor supports
798 this). If None, use default schema (default).
800 Returns
801 -------
802 boolean
803 """
804 with pandasSQL_builder(con, schema=schema) as pandas_sql:
805 return pandas_sql.has_table(table_name)
808table_exists = has_table
811def pandasSQL_builder(
812 con,
813 schema: str | None = None,
814 need_transaction: bool = False,
815) -> PandasSQL:
816 """
817 Convenience function to return the correct PandasSQL subclass based on the
818 provided parameters. Also creates a sqlalchemy connection and transaction
819 if necessary.
820 """
821 import sqlite3
823 if isinstance(con, sqlite3.Connection) or con is None:
824 return SQLiteDatabase(con)
826 sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore")
828 if isinstance(con, str) and sqlalchemy is None:
829 raise ImportError("Using URI string without sqlalchemy installed.")
831 if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Connectable)):
832 return SQLDatabase(con, schema, need_transaction)
834 warnings.warn(
835 "pandas only supports SQLAlchemy connectable (engine/connection) or "
836 "database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 "
837 "objects are not tested. Please consider using SQLAlchemy.",
838 UserWarning,
839 stacklevel=find_stack_level(),
840 )
841 return SQLiteDatabase(con)
844class SQLTable(PandasObject):
845 """
846 For mapping Pandas tables to SQL tables.
847 Uses fact that table is reflected by SQLAlchemy to
848 do better type conversions.
849 Also holds various flags needed to avoid having to
850 pass them between functions all the time.
851 """
853 # TODO: support for multiIndex
855 def __init__(
856 self,
857 name: str,
858 pandas_sql_engine,
859 frame=None,
860 index: bool | str | list[str] | None = True,
861 if_exists: Literal["fail", "replace", "append"] = "fail",
862 prefix: str = "pandas",
863 index_label=None,
864 schema=None,
865 keys=None,
866 dtype: DtypeArg | None = None,
867 ) -> None:
868 self.name = name
869 self.pd_sql = pandas_sql_engine
870 self.prefix = prefix
871 self.frame = frame
872 self.index = self._index_name(index, index_label)
873 self.schema = schema
874 self.if_exists = if_exists
875 self.keys = keys
876 self.dtype = dtype
878 if frame is not None:
879 # We want to initialize based on a dataframe
880 self.table = self._create_table_setup()
881 else:
882 # no data provided, read-only mode
883 self.table = self.pd_sql.get_table(self.name, self.schema)
885 if self.table is None:
886 raise ValueError(f"Could not init table '{name}'")
888 def exists(self):
889 return self.pd_sql.has_table(self.name, self.schema)
891 def sql_schema(self) -> str:
892 from sqlalchemy.schema import CreateTable
894 return str(CreateTable(self.table).compile(self.pd_sql.con))
896 def _execute_create(self) -> None:
897 # Inserting table into database, add to MetaData object
898 self.table = self.table.to_metadata(self.pd_sql.meta)
899 with self.pd_sql.run_transaction():
900 self.table.create(bind=self.pd_sql.con)
902 def create(self) -> None:
903 if self.exists():
904 if self.if_exists == "fail":
905 raise ValueError(f"Table '{self.name}' already exists.")
906 if self.if_exists == "replace":
907 self.pd_sql.drop_table(self.name, self.schema)
908 self._execute_create()
909 elif self.if_exists == "append":
910 pass
911 else:
912 raise ValueError(f"'{self.if_exists}' is not valid for if_exists")
913 else:
914 self._execute_create()
916 def _execute_insert(self, conn, keys: list[str], data_iter) -> int:
917 """
918 Execute SQL statement inserting data
920 Parameters
921 ----------
922 conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
923 keys : list of str
924 Column names
925 data_iter : generator of list
926 Each item contains a list of values to be inserted
927 """
928 data = [dict(zip(keys, row)) for row in data_iter]
929 result = conn.execute(self.table.insert(), data)
930 return result.rowcount
932 def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:
933 """
934 Alternative to _execute_insert for DBs support multivalue INSERT.
936 Note: multi-value insert is usually faster for analytics DBs
937 and tables containing a few columns
938 but performance degrades quickly with increase of columns.
939 """
941 from sqlalchemy import insert
943 data = [dict(zip(keys, row)) for row in data_iter]
944 stmt = insert(self.table).values(data)
945 result = conn.execute(stmt)
946 return result.rowcount
948 def insert_data(self) -> tuple[list[str], list[np.ndarray]]:
949 if self.index is not None:
950 temp = self.frame.copy()
951 temp.index.names = self.index
952 try:
953 temp.reset_index(inplace=True)
954 except ValueError as err:
955 raise ValueError(f"duplicate name in index/columns: {err}") from err
956 else:
957 temp = self.frame
959 column_names = list(map(str, temp.columns))
960 ncols = len(column_names)
961 # this just pre-allocates the list: None's will be replaced with ndarrays
962 # error: List item 0 has incompatible type "None"; expected "ndarray"
963 data_list: list[np.ndarray] = [None] * ncols # type: ignore[list-item]
965 for i, (_, ser) in enumerate(temp.items()):
966 if ser.dtype.kind == "M":
967 d = ser.dt.to_pydatetime()
968 elif ser.dtype.kind == "m":
969 vals = ser._values
970 if isinstance(vals, ArrowExtensionArray):
971 vals = vals.to_numpy(dtype=np.dtype("m8[ns]"))
972 # store as integers, see GH#6921, GH#7076
973 d = vals.view("i8").astype(object)
974 else:
975 d = ser._values.astype(object)
977 assert isinstance(d, np.ndarray), type(d)
979 if ser._can_hold_na:
980 # Note: this will miss timedeltas since they are converted to int
981 mask = isna(d)
982 d[mask] = None
984 data_list[i] = d
986 return column_names, data_list
988 def insert(
989 self, chunksize: int | None = None, method: str | None = None
990 ) -> int | None:
991 # set insert method
992 if method is None:
993 exec_insert = self._execute_insert
994 elif method == "multi":
995 exec_insert = self._execute_insert_multi
996 elif callable(method):
997 exec_insert = partial(method, self)
998 else:
999 raise ValueError(f"Invalid parameter `method`: {method}")
1001 keys, data_list = self.insert_data()
1003 nrows = len(self.frame)
1005 if nrows == 0:
1006 return 0
1008 if chunksize is None:
1009 chunksize = nrows
1010 elif chunksize == 0:
1011 raise ValueError("chunksize argument should be non-zero")
1013 chunks = (nrows // chunksize) + 1
1014 total_inserted = None
1015 with self.pd_sql.run_transaction() as conn:
1016 for i in range(chunks):
1017 start_i = i * chunksize
1018 end_i = min((i + 1) * chunksize, nrows)
1019 if start_i >= end_i:
1020 break
1022 chunk_iter = zip(*(arr[start_i:end_i] for arr in data_list))
1023 num_inserted = exec_insert(conn, keys, chunk_iter)
1024 # GH 46891
1025 if is_integer(num_inserted):
1026 if total_inserted is None:
1027 total_inserted = num_inserted
1028 else:
1029 total_inserted += num_inserted
1030 return total_inserted
1032 def _query_iterator(
1033 self,
1034 result,
1035 exit_stack: ExitStack,
1036 chunksize: str | None,
1037 columns,
1038 coerce_float: bool = True,
1039 parse_dates=None,
1040 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1041 ):
1042 """Return generator through chunked result set."""
1043 has_read_data = False
1044 with exit_stack:
1045 while True:
1046 data = result.fetchmany(chunksize)
1047 if not data:
1048 if not has_read_data:
1049 yield DataFrame.from_records(
1050 [], columns=columns, coerce_float=coerce_float
1051 )
1052 break
1054 has_read_data = True
1055 self.frame = _convert_arrays_to_dataframe(
1056 data, columns, coerce_float, dtype_backend
1057 )
1059 self._harmonize_columns(
1060 parse_dates=parse_dates, dtype_backend=dtype_backend
1061 )
1063 if self.index is not None:
1064 self.frame.set_index(self.index, inplace=True)
1066 yield self.frame
1068 def read(
1069 self,
1070 exit_stack: ExitStack,
1071 coerce_float: bool = True,
1072 parse_dates=None,
1073 columns=None,
1074 chunksize=None,
1075 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1076 ) -> DataFrame | Iterator[DataFrame]:
1077 from sqlalchemy import select
1079 if columns is not None and len(columns) > 0:
1080 cols = [self.table.c[n] for n in columns]
1081 if self.index is not None:
1082 for idx in self.index[::-1]:
1083 cols.insert(0, self.table.c[idx])
1084 sql_select = select(*cols)
1085 else:
1086 sql_select = select(self.table)
1087 result = self.pd_sql.execute(sql_select)
1088 column_names = result.keys()
1090 if chunksize is not None:
1091 return self._query_iterator(
1092 result,
1093 exit_stack,
1094 chunksize,
1095 column_names,
1096 coerce_float=coerce_float,
1097 parse_dates=parse_dates,
1098 dtype_backend=dtype_backend,
1099 )
1100 else:
1101 data = result.fetchall()
1102 self.frame = _convert_arrays_to_dataframe(
1103 data, column_names, coerce_float, dtype_backend
1104 )
1106 self._harmonize_columns(
1107 parse_dates=parse_dates, dtype_backend=dtype_backend
1108 )
1110 if self.index is not None:
1111 self.frame.set_index(self.index, inplace=True)
1113 return self.frame
1115 def _index_name(self, index, index_label):
1116 # for writing: index=True to include index in sql table
1117 if index is True:
1118 nlevels = self.frame.index.nlevels
1119 # if index_label is specified, set this as index name(s)
1120 if index_label is not None:
1121 if not isinstance(index_label, list):
1122 index_label = [index_label]
1123 if len(index_label) != nlevels:
1124 raise ValueError(
1125 "Length of 'index_label' should match number of "
1126 f"levels, which is {nlevels}"
1127 )
1128 return index_label
1129 # return the used column labels for the index columns
1130 if (
1131 nlevels == 1
1132 and "index" not in self.frame.columns
1133 and self.frame.index.name is None
1134 ):
1135 return ["index"]
1136 else:
1137 return com.fill_missing_names(self.frame.index.names)
1139 # for reading: index=(list of) string to specify column to set as index
1140 elif isinstance(index, str):
1141 return [index]
1142 elif isinstance(index, list):
1143 return index
1144 else:
1145 return None
1147 def _get_column_names_and_types(self, dtype_mapper):
1148 column_names_and_types = []
1149 if self.index is not None:
1150 for i, idx_label in enumerate(self.index):
1151 idx_type = dtype_mapper(self.frame.index._get_level_values(i))
1152 column_names_and_types.append((str(idx_label), idx_type, True))
1154 column_names_and_types += [
1155 (str(self.frame.columns[i]), dtype_mapper(self.frame.iloc[:, i]), False)
1156 for i in range(len(self.frame.columns))
1157 ]
1159 return column_names_and_types
1161 def _create_table_setup(self):
1162 from sqlalchemy import (
1163 Column,
1164 PrimaryKeyConstraint,
1165 Table,
1166 )
1167 from sqlalchemy.schema import MetaData
1169 column_names_and_types = self._get_column_names_and_types(self._sqlalchemy_type)
1171 columns: list[Any] = [
1172 Column(name, typ, index=is_index)
1173 for name, typ, is_index in column_names_and_types
1174 ]
1176 if self.keys is not None:
1177 if not is_list_like(self.keys):
1178 keys = [self.keys]
1179 else:
1180 keys = self.keys
1181 pkc = PrimaryKeyConstraint(*keys, name=self.name + "_pk")
1182 columns.append(pkc)
1184 schema = self.schema or self.pd_sql.meta.schema
1186 # At this point, attach to new metadata, only attach to self.meta
1187 # once table is created.
1188 meta = MetaData()
1189 return Table(self.name, meta, *columns, schema=schema)
1191 def _harmonize_columns(
1192 self,
1193 parse_dates=None,
1194 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1195 ) -> None:
1196 """
1197 Make the DataFrame's column types align with the SQL table
1198 column types.
1199 Need to work around limited NA value support. Floats are always
1200 fine, ints must always be floats if there are Null values.
1201 Booleans are hard because converting bool column with None replaces
1202 all Nones with false. Therefore only convert bool if there are no
1203 NA values.
1204 Datetimes should already be converted to np.datetime64 if supported,
1205 but here we also force conversion if required.
1206 """
1207 parse_dates = _process_parse_dates_argument(parse_dates)
1209 for sql_col in self.table.columns:
1210 col_name = sql_col.name
1211 try:
1212 df_col = self.frame[col_name]
1214 # Handle date parsing upfront; don't try to convert columns
1215 # twice
1216 if col_name in parse_dates:
1217 try:
1218 fmt = parse_dates[col_name]
1219 except TypeError:
1220 fmt = None
1221 self.frame[col_name] = _handle_date_column(df_col, format=fmt)
1222 continue
1224 # the type the dataframe column should have
1225 col_type = self._get_dtype(sql_col.type)
1227 if (
1228 col_type is datetime
1229 or col_type is date
1230 or col_type is DatetimeTZDtype
1231 ):
1232 # Convert tz-aware Datetime SQL columns to UTC
1233 utc = col_type is DatetimeTZDtype
1234 self.frame[col_name] = _handle_date_column(df_col, utc=utc)
1235 elif dtype_backend == "numpy" and col_type is float:
1236 # floats support NA, can always convert!
1237 self.frame[col_name] = df_col.astype(col_type, copy=False)
1239 elif dtype_backend == "numpy" and len(df_col) == df_col.count():
1240 # No NA values, can convert ints and bools
1241 if col_type is np.dtype("int64") or col_type is bool:
1242 self.frame[col_name] = df_col.astype(col_type, copy=False)
1243 except KeyError:
1244 pass # this column not in results
1246 def _sqlalchemy_type(self, col):
1247 dtype: DtypeArg = self.dtype or {}
1248 if is_dict_like(dtype):
1249 dtype = cast(dict, dtype)
1250 if col.name in dtype:
1251 return dtype[col.name]
1253 # Infer type of column, while ignoring missing values.
1254 # Needed for inserting typed data containing NULLs, GH 8778.
1255 col_type = lib.infer_dtype(col, skipna=True)
1257 from sqlalchemy.types import (
1258 TIMESTAMP,
1259 BigInteger,
1260 Boolean,
1261 Date,
1262 DateTime,
1263 Float,
1264 Integer,
1265 SmallInteger,
1266 Text,
1267 Time,
1268 )
1270 if col_type in ("datetime64", "datetime"):
1271 # GH 9086: TIMESTAMP is the suggested type if the column contains
1272 # timezone information
1273 try:
1274 if col.dt.tz is not None:
1275 return TIMESTAMP(timezone=True)
1276 except AttributeError:
1277 # The column is actually a DatetimeIndex
1278 # GH 26761 or an Index with date-like data e.g. 9999-01-01
1279 if getattr(col, "tz", None) is not None:
1280 return TIMESTAMP(timezone=True)
1281 return DateTime
1282 if col_type == "timedelta64":
1283 warnings.warn(
1284 "the 'timedelta' type is not supported, and will be "
1285 "written as integer values (ns frequency) to the database.",
1286 UserWarning,
1287 stacklevel=find_stack_level(),
1288 )
1289 return BigInteger
1290 elif col_type == "floating":
1291 if col.dtype == "float32":
1292 return Float(precision=23)
1293 else:
1294 return Float(precision=53)
1295 elif col_type == "integer":
1296 # GH35076 Map pandas integer to optimal SQLAlchemy integer type
1297 if col.dtype.name.lower() in ("int8", "uint8", "int16"):
1298 return SmallInteger
1299 elif col.dtype.name.lower() in ("uint16", "int32"):
1300 return Integer
1301 elif col.dtype.name.lower() == "uint64":
1302 raise ValueError("Unsigned 64 bit integer datatype is not supported")
1303 else:
1304 return BigInteger
1305 elif col_type == "boolean":
1306 return Boolean
1307 elif col_type == "date":
1308 return Date
1309 elif col_type == "time":
1310 return Time
1311 elif col_type == "complex":
1312 raise ValueError("Complex datatypes not supported")
1314 return Text
1316 def _get_dtype(self, sqltype):
1317 from sqlalchemy.types import (
1318 TIMESTAMP,
1319 Boolean,
1320 Date,
1321 DateTime,
1322 Float,
1323 Integer,
1324 )
1326 if isinstance(sqltype, Float):
1327 return float
1328 elif isinstance(sqltype, Integer):
1329 # TODO: Refine integer size.
1330 return np.dtype("int64")
1331 elif isinstance(sqltype, TIMESTAMP):
1332 # we have a timezone capable type
1333 if not sqltype.timezone:
1334 return datetime
1335 return DatetimeTZDtype
1336 elif isinstance(sqltype, DateTime):
1337 # Caution: np.datetime64 is also a subclass of np.number.
1338 return datetime
1339 elif isinstance(sqltype, Date):
1340 return date
1341 elif isinstance(sqltype, Boolean):
1342 return bool
1343 return object
1346class PandasSQL(PandasObject, ABC):
1347 """
1348 Subclasses Should define read_query and to_sql.
1349 """
1351 def __enter__(self):
1352 return self
1354 def __exit__(self, *args) -> None:
1355 pass
1357 def read_table(
1358 self,
1359 table_name: str,
1360 index_col: str | list[str] | None = None,
1361 coerce_float: bool = True,
1362 parse_dates=None,
1363 columns=None,
1364 schema: str | None = None,
1365 chunksize: int | None = None,
1366 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1367 ) -> DataFrame | Iterator[DataFrame]:
1368 raise NotImplementedError
1370 @abstractmethod
1371 def read_query(
1372 self,
1373 sql: str,
1374 index_col: str | list[str] | None = None,
1375 coerce_float: bool = True,
1376 parse_dates=None,
1377 params=None,
1378 chunksize: int | None = None,
1379 dtype: DtypeArg | None = None,
1380 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1381 ) -> DataFrame | Iterator[DataFrame]:
1382 pass
1384 @abstractmethod
1385 def to_sql(
1386 self,
1387 frame,
1388 name,
1389 if_exists: Literal["fail", "replace", "append"] = "fail",
1390 index: bool = True,
1391 index_label=None,
1392 schema=None,
1393 chunksize=None,
1394 dtype: DtypeArg | None = None,
1395 method=None,
1396 engine: str = "auto",
1397 **engine_kwargs,
1398 ) -> int | None:
1399 pass
1401 @abstractmethod
1402 def execute(self, sql: str | Select | TextClause, params=None):
1403 pass
1405 @abstractmethod
1406 def has_table(self, name: str, schema: str | None = None) -> bool:
1407 pass
1409 @abstractmethod
1410 def _create_sql_schema(
1411 self,
1412 frame: DataFrame,
1413 table_name: str,
1414 keys: list[str] | None = None,
1415 dtype: DtypeArg | None = None,
1416 schema: str | None = None,
1417 ):
1418 pass
1421class BaseEngine:
1422 def insert_records(
1423 self,
1424 table: SQLTable,
1425 con,
1426 frame,
1427 name,
1428 index: bool | str | list[str] | None = True,
1429 schema=None,
1430 chunksize=None,
1431 method=None,
1432 **engine_kwargs,
1433 ) -> int | None:
1434 """
1435 Inserts data into already-prepared table
1436 """
1437 raise AbstractMethodError(self)
1440class SQLAlchemyEngine(BaseEngine):
1441 def __init__(self) -> None:
1442 import_optional_dependency(
1443 "sqlalchemy", extra="sqlalchemy is required for SQL support."
1444 )
1446 def insert_records(
1447 self,
1448 table: SQLTable,
1449 con,
1450 frame,
1451 name,
1452 index: bool | str | list[str] | None = True,
1453 schema=None,
1454 chunksize=None,
1455 method=None,
1456 **engine_kwargs,
1457 ) -> int | None:
1458 from sqlalchemy import exc
1460 try:
1461 return table.insert(chunksize=chunksize, method=method)
1462 except exc.StatementError as err:
1463 # GH34431
1464 # https://stackoverflow.com/a/67358288/6067848
1465 msg = r"""(\(1054, "Unknown column 'inf(e0)?' in 'field list'"\))(?#
1466 )|inf can not be used with MySQL"""
1467 err_text = str(err.orig)
1468 if re.search(msg, err_text):
1469 raise ValueError("inf cannot be used with MySQL") from err
1470 raise err
1473def get_engine(engine: str) -> BaseEngine:
1474 """return our implementation"""
1475 if engine == "auto":
1476 engine = get_option("io.sql.engine")
1478 if engine == "auto":
1479 # try engines in this order
1480 engine_classes = [SQLAlchemyEngine]
1482 error_msgs = ""
1483 for engine_class in engine_classes:
1484 try:
1485 return engine_class()
1486 except ImportError as err:
1487 error_msgs += "\n - " + str(err)
1489 raise ImportError(
1490 "Unable to find a usable engine; "
1491 "tried using: 'sqlalchemy'.\n"
1492 "A suitable version of "
1493 "sqlalchemy is required for sql I/O "
1494 "support.\n"
1495 "Trying to import the above resulted in these errors:"
1496 f"{error_msgs}"
1497 )
1499 if engine == "sqlalchemy":
1500 return SQLAlchemyEngine()
1502 raise ValueError("engine must be one of 'auto', 'sqlalchemy'")
1505class SQLDatabase(PandasSQL):
1506 """
1507 This class enables conversion between DataFrame and SQL databases
1508 using SQLAlchemy to handle DataBase abstraction.
1510 Parameters
1511 ----------
1512 con : SQLAlchemy Connectable or URI string.
1513 Connectable to connect with the database. Using SQLAlchemy makes it
1514 possible to use any DB supported by that library.
1515 schema : string, default None
1516 Name of SQL schema in database to write to (if database flavor
1517 supports this). If None, use default schema (default).
1518 need_transaction : bool, default False
1519 If True, SQLDatabase will create a transaction.
1521 """
1523 def __init__(
1524 self, con, schema: str | None = None, need_transaction: bool = False
1525 ) -> None:
1526 from sqlalchemy import create_engine
1527 from sqlalchemy.engine import Engine
1528 from sqlalchemy.schema import MetaData
1530 # self.exit_stack cleans up the Engine and Connection and commits the
1531 # transaction if any of those objects was created below.
1532 # Cleanup happens either in self.__exit__ or at the end of the iterator
1533 # returned by read_sql when chunksize is not None.
1534 self.exit_stack = ExitStack()
1535 if isinstance(con, str):
1536 con = create_engine(con)
1537 self.exit_stack.callback(con.dispose)
1538 if isinstance(con, Engine):
1539 con = self.exit_stack.enter_context(con.connect())
1540 if need_transaction and not con.in_transaction():
1541 self.exit_stack.enter_context(con.begin())
1542 self.con = con
1543 self.meta = MetaData(schema=schema)
1544 self.returns_generator = False
1546 def __exit__(self, *args) -> None:
1547 if not self.returns_generator:
1548 self.exit_stack.close()
1550 @contextmanager
1551 def run_transaction(self):
1552 if not self.con.in_transaction():
1553 with self.con.begin():
1554 yield self.con
1555 else:
1556 yield self.con
1558 def execute(self, sql: str | Select | TextClause, params=None):
1559 """Simple passthrough to SQLAlchemy connectable"""
1560 args = [] if params is None else [params]
1561 if isinstance(sql, str):
1562 return self.con.exec_driver_sql(sql, *args)
1563 return self.con.execute(sql, *args)
1565 def read_table(
1566 self,
1567 table_name: str,
1568 index_col: str | list[str] | None = None,
1569 coerce_float: bool = True,
1570 parse_dates=None,
1571 columns=None,
1572 schema: str | None = None,
1573 chunksize: int | None = None,
1574 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1575 ) -> DataFrame | Iterator[DataFrame]:
1576 """
1577 Read SQL database table into a DataFrame.
1579 Parameters
1580 ----------
1581 table_name : str
1582 Name of SQL table in database.
1583 index_col : string, optional, default: None
1584 Column to set as index.
1585 coerce_float : bool, default True
1586 Attempts to convert values of non-string, non-numeric objects
1587 (like decimal.Decimal) to floating point. This can result in
1588 loss of precision.
1589 parse_dates : list or dict, default: None
1590 - List of column names to parse as dates.
1591 - Dict of ``{column_name: format string}`` where format string is
1592 strftime compatible in case of parsing string times, or is one of
1593 (D, s, ns, ms, us) in case of parsing integer timestamps.
1594 - Dict of ``{column_name: arg}``, where the arg corresponds
1595 to the keyword arguments of :func:`pandas.to_datetime`.
1596 Especially useful with databases without native Datetime support,
1597 such as SQLite.
1598 columns : list, default: None
1599 List of column names to select from SQL table.
1600 schema : string, default None
1601 Name of SQL schema in database to query (if database flavor
1602 supports this). If specified, this overwrites the default
1603 schema of the SQL database object.
1604 chunksize : int, default None
1605 If specified, return an iterator where `chunksize` is the number
1606 of rows to include in each chunk.
1607 dtype_backend : {{"numpy_nullable", "pyarrow"}}, defaults to NumPy dtypes
1608 Which dtype_backend to use, e.g. whether a DataFrame should have NumPy
1609 arrays, nullable dtypes are used for all dtypes that have a nullable
1610 implementation when "numpy_nullable" is set, pyarrow is used for all
1611 dtypes if "pyarrow" is set.
1613 The dtype_backends are still experimential.
1615 .. versionadded:: 2.0
1617 Returns
1618 -------
1619 DataFrame
1621 See Also
1622 --------
1623 pandas.read_sql_table
1624 SQLDatabase.read_query
1626 """
1627 self.meta.reflect(bind=self.con, only=[table_name])
1628 table = SQLTable(table_name, self, index=index_col, schema=schema)
1629 if chunksize is not None:
1630 self.returns_generator = True
1631 return table.read(
1632 self.exit_stack,
1633 coerce_float=coerce_float,
1634 parse_dates=parse_dates,
1635 columns=columns,
1636 chunksize=chunksize,
1637 dtype_backend=dtype_backend,
1638 )
1640 @staticmethod
1641 def _query_iterator(
1642 result,
1643 exit_stack: ExitStack,
1644 chunksize: int,
1645 columns,
1646 index_col=None,
1647 coerce_float: bool = True,
1648 parse_dates=None,
1649 dtype: DtypeArg | None = None,
1650 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1651 ):
1652 """Return generator through chunked result set"""
1653 has_read_data = False
1654 with exit_stack:
1655 while True:
1656 data = result.fetchmany(chunksize)
1657 if not data:
1658 if not has_read_data:
1659 yield _wrap_result(
1660 [],
1661 columns,
1662 index_col=index_col,
1663 coerce_float=coerce_float,
1664 parse_dates=parse_dates,
1665 dtype=dtype,
1666 dtype_backend=dtype_backend,
1667 )
1668 break
1670 has_read_data = True
1671 yield _wrap_result(
1672 data,
1673 columns,
1674 index_col=index_col,
1675 coerce_float=coerce_float,
1676 parse_dates=parse_dates,
1677 dtype=dtype,
1678 dtype_backend=dtype_backend,
1679 )
1681 def read_query(
1682 self,
1683 sql: str,
1684 index_col: str | list[str] | None = None,
1685 coerce_float: bool = True,
1686 parse_dates=None,
1687 params=None,
1688 chunksize: int | None = None,
1689 dtype: DtypeArg | None = None,
1690 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
1691 ) -> DataFrame | Iterator[DataFrame]:
1692 """
1693 Read SQL query into a DataFrame.
1695 Parameters
1696 ----------
1697 sql : str
1698 SQL query to be executed.
1699 index_col : string, optional, default: None
1700 Column name to use as index for the returned DataFrame object.
1701 coerce_float : bool, default True
1702 Attempt to convert values of non-string, non-numeric objects (like
1703 decimal.Decimal) to floating point, useful for SQL result sets.
1704 params : list, tuple or dict, optional, default: None
1705 List of parameters to pass to execute method. The syntax used
1706 to pass parameters is database driver dependent. Check your
1707 database driver documentation for which of the five syntax styles,
1708 described in PEP 249's paramstyle, is supported.
1709 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}
1710 parse_dates : list or dict, default: None
1711 - List of column names to parse as dates.
1712 - Dict of ``{column_name: format string}`` where format string is
1713 strftime compatible in case of parsing string times, or is one of
1714 (D, s, ns, ms, us) in case of parsing integer timestamps.
1715 - Dict of ``{column_name: arg dict}``, where the arg dict
1716 corresponds to the keyword arguments of
1717 :func:`pandas.to_datetime` Especially useful with databases
1718 without native Datetime support, such as SQLite.
1719 chunksize : int, default None
1720 If specified, return an iterator where `chunksize` is the number
1721 of rows to include in each chunk.
1722 dtype : Type name or dict of columns
1723 Data type for data or columns. E.g. np.float64 or
1724 {‘a’: np.float64, ‘b’: np.int32, ‘c’: ‘Int64’}
1726 .. versionadded:: 1.3.0
1728 Returns
1729 -------
1730 DataFrame
1732 See Also
1733 --------
1734 read_sql_table : Read SQL database table into a DataFrame.
1735 read_sql
1737 """
1738 result = self.execute(sql, params)
1739 columns = result.keys()
1741 if chunksize is not None:
1742 self.returns_generator = True
1743 return self._query_iterator(
1744 result,
1745 self.exit_stack,
1746 chunksize,
1747 columns,
1748 index_col=index_col,
1749 coerce_float=coerce_float,
1750 parse_dates=parse_dates,
1751 dtype=dtype,
1752 dtype_backend=dtype_backend,
1753 )
1754 else:
1755 data = result.fetchall()
1756 frame = _wrap_result(
1757 data,
1758 columns,
1759 index_col=index_col,
1760 coerce_float=coerce_float,
1761 parse_dates=parse_dates,
1762 dtype=dtype,
1763 dtype_backend=dtype_backend,
1764 )
1765 return frame
1767 read_sql = read_query
1769 def prep_table(
1770 self,
1771 frame,
1772 name,
1773 if_exists: Literal["fail", "replace", "append"] = "fail",
1774 index: bool | str | list[str] | None = True,
1775 index_label=None,
1776 schema=None,
1777 dtype: DtypeArg | None = None,
1778 ) -> SQLTable:
1779 """
1780 Prepares table in the database for data insertion. Creates it if needed, etc.
1781 """
1782 if dtype:
1783 if not is_dict_like(dtype):
1784 # error: Value expression in dictionary comprehension has incompatible
1785 # type "Union[ExtensionDtype, str, dtype[Any], Type[object],
1786 # Dict[Hashable, Union[ExtensionDtype, Union[str, dtype[Any]],
1787 # Type[str], Type[float], Type[int], Type[complex], Type[bool],
1788 # Type[object]]]]"; expected type "Union[ExtensionDtype, str,
1789 # dtype[Any], Type[object]]"
1790 dtype = {col_name: dtype for col_name in frame} # type: ignore[misc]
1791 else:
1792 dtype = cast(dict, dtype)
1794 from sqlalchemy.types import TypeEngine
1796 for col, my_type in dtype.items():
1797 if isinstance(my_type, type) and issubclass(my_type, TypeEngine):
1798 pass
1799 elif isinstance(my_type, TypeEngine):
1800 pass
1801 else:
1802 raise ValueError(f"The type of {col} is not a SQLAlchemy type")
1804 table = SQLTable(
1805 name,
1806 self,
1807 frame=frame,
1808 index=index,
1809 if_exists=if_exists,
1810 index_label=index_label,
1811 schema=schema,
1812 dtype=dtype,
1813 )
1814 table.create()
1815 return table
1817 def check_case_sensitive(
1818 self,
1819 name: str,
1820 schema: str | None,
1821 ) -> None:
1822 """
1823 Checks table name for issues with case-sensitivity.
1824 Method is called after data is inserted.
1825 """
1826 if not name.isdigit() and not name.islower():
1827 # check for potentially case sensitivity issues (GH7815)
1828 # Only check when name is not a number and name is not lower case
1829 from sqlalchemy import inspect as sqlalchemy_inspect
1831 insp = sqlalchemy_inspect(self.con)
1832 table_names = insp.get_table_names(schema=schema or self.meta.schema)
1833 if name not in table_names:
1834 msg = (
1835 f"The provided table name '{name}' is not found exactly as "
1836 "such in the database after writing the table, possibly "
1837 "due to case sensitivity issues. Consider using lower "
1838 "case table names."
1839 )
1840 warnings.warn(
1841 msg,
1842 UserWarning,
1843 stacklevel=find_stack_level(),
1844 )
1846 def to_sql(
1847 self,
1848 frame,
1849 name: str,
1850 if_exists: Literal["fail", "replace", "append"] = "fail",
1851 index: bool = True,
1852 index_label=None,
1853 schema: str | None = None,
1854 chunksize=None,
1855 dtype: DtypeArg | None = None,
1856 method=None,
1857 engine: str = "auto",
1858 **engine_kwargs,
1859 ) -> int | None:
1860 """
1861 Write records stored in a DataFrame to a SQL database.
1863 Parameters
1864 ----------
1865 frame : DataFrame
1866 name : string
1867 Name of SQL table.
1868 if_exists : {'fail', 'replace', 'append'}, default 'fail'
1869 - fail: If table exists, do nothing.
1870 - replace: If table exists, drop it, recreate it, and insert data.
1871 - append: If table exists, insert data. Create if does not exist.
1872 index : boolean, default True
1873 Write DataFrame index as a column.
1874 index_label : string or sequence, default None
1875 Column label for index column(s). If None is given (default) and
1876 `index` is True, then the index names are used.
1877 A sequence should be given if the DataFrame uses MultiIndex.
1878 schema : string, default None
1879 Name of SQL schema in database to write to (if database flavor
1880 supports this). If specified, this overwrites the default
1881 schema of the SQLDatabase object.
1882 chunksize : int, default None
1883 If not None, then rows will be written in batches of this size at a
1884 time. If None, all rows will be written at once.
1885 dtype : single type or dict of column name to SQL type, default None
1886 Optional specifying the datatype for columns. The SQL type should
1887 be a SQLAlchemy type. If all columns are of the same type, one
1888 single value can be used.
1889 method : {None', 'multi', callable}, default None
1890 Controls the SQL insertion clause used:
1892 * None : Uses standard SQL ``INSERT`` clause (one per row).
1893 * 'multi': Pass multiple values in a single ``INSERT`` clause.
1894 * callable with signature ``(pd_table, conn, keys, data_iter)``.
1896 Details and a sample callable implementation can be found in the
1897 section :ref:`insert method <io.sql.method>`.
1898 engine : {'auto', 'sqlalchemy'}, default 'auto'
1899 SQL engine library to use. If 'auto', then the option
1900 ``io.sql.engine`` is used. The default ``io.sql.engine``
1901 behavior is 'sqlalchemy'
1903 .. versionadded:: 1.3.0
1905 **engine_kwargs
1906 Any additional kwargs are passed to the engine.
1907 """
1908 sql_engine = get_engine(engine)
1910 table = self.prep_table(
1911 frame=frame,
1912 name=name,
1913 if_exists=if_exists,
1914 index=index,
1915 index_label=index_label,
1916 schema=schema,
1917 dtype=dtype,
1918 )
1920 total_inserted = sql_engine.insert_records(
1921 table=table,
1922 con=self.con,
1923 frame=frame,
1924 name=name,
1925 index=index,
1926 schema=schema,
1927 chunksize=chunksize,
1928 method=method,
1929 **engine_kwargs,
1930 )
1932 self.check_case_sensitive(name=name, schema=schema)
1933 return total_inserted
1935 @property
1936 def tables(self):
1937 return self.meta.tables
1939 def has_table(self, name: str, schema: str | None = None) -> bool:
1940 from sqlalchemy import inspect as sqlalchemy_inspect
1942 insp = sqlalchemy_inspect(self.con)
1943 return insp.has_table(name, schema or self.meta.schema)
1945 def get_table(self, table_name: str, schema: str | None = None) -> Table:
1946 from sqlalchemy import (
1947 Numeric,
1948 Table,
1949 )
1951 schema = schema or self.meta.schema
1952 tbl = Table(table_name, self.meta, autoload_with=self.con, schema=schema)
1953 for column in tbl.columns:
1954 if isinstance(column.type, Numeric):
1955 column.type.asdecimal = False
1956 return tbl
1958 def drop_table(self, table_name: str, schema: str | None = None) -> None:
1959 schema = schema or self.meta.schema
1960 if self.has_table(table_name, schema):
1961 self.meta.reflect(bind=self.con, only=[table_name], schema=schema)
1962 with self.run_transaction():
1963 self.get_table(table_name, schema).drop(bind=self.con)
1964 self.meta.clear()
1966 def _create_sql_schema(
1967 self,
1968 frame: DataFrame,
1969 table_name: str,
1970 keys: list[str] | None = None,
1971 dtype: DtypeArg | None = None,
1972 schema: str | None = None,
1973 ):
1974 table = SQLTable(
1975 table_name,
1976 self,
1977 frame=frame,
1978 index=False,
1979 keys=keys,
1980 dtype=dtype,
1981 schema=schema,
1982 )
1983 return str(table.sql_schema())
1986# ---- SQL without SQLAlchemy ---
1987# sqlite-specific sql strings and handler class
1988# dictionary used for readability purposes
1989_SQL_TYPES = {
1990 "string": "TEXT",
1991 "floating": "REAL",
1992 "integer": "INTEGER",
1993 "datetime": "TIMESTAMP",
1994 "date": "DATE",
1995 "time": "TIME",
1996 "boolean": "INTEGER",
1997}
2000def _get_unicode_name(name):
2001 try:
2002 uname = str(name).encode("utf-8", "strict").decode("utf-8")
2003 except UnicodeError as err:
2004 raise ValueError(f"Cannot convert identifier to UTF-8: '{name}'") from err
2005 return uname
2008def _get_valid_sqlite_name(name):
2009 # See https://stackoverflow.com/questions/6514274/how-do-you-escape-strings\
2010 # -for-sqlite-table-column-names-in-python
2011 # Ensure the string can be encoded as UTF-8.
2012 # Ensure the string does not include any NUL characters.
2013 # Replace all " with "".
2014 # Wrap the entire thing in double quotes.
2016 uname = _get_unicode_name(name)
2017 if not len(uname):
2018 raise ValueError("Empty table or column name specified")
2020 nul_index = uname.find("\x00")
2021 if nul_index >= 0:
2022 raise ValueError("SQLite identifier cannot contain NULs")
2023 return '"' + uname.replace('"', '""') + '"'
2026class SQLiteTable(SQLTable):
2027 """
2028 Patch the SQLTable for fallback support.
2029 Instead of a table variable just use the Create Table statement.
2030 """
2032 def __init__(self, *args, **kwargs) -> None:
2033 # GH 8341
2034 # register an adapter callable for datetime.time object
2035 import sqlite3
2037 # this will transform time(12,34,56,789) into '12:34:56.000789'
2038 # (this is what sqlalchemy does)
2039 def _adapt_time(t) -> str:
2040 # This is faster than strftime
2041 return f"{t.hour:02d}:{t.minute:02d}:{t.second:02d}.{t.microsecond:06d}"
2043 sqlite3.register_adapter(time, _adapt_time)
2044 super().__init__(*args, **kwargs)
2046 def sql_schema(self) -> str:
2047 return str(";\n".join(self.table))
2049 def _execute_create(self) -> None:
2050 with self.pd_sql.run_transaction() as conn:
2051 for stmt in self.table:
2052 conn.execute(stmt)
2054 def insert_statement(self, *, num_rows: int) -> str:
2055 names = list(map(str, self.frame.columns))
2056 wld = "?" # wildcard char
2057 escape = _get_valid_sqlite_name
2059 if self.index is not None:
2060 for idx in self.index[::-1]:
2061 names.insert(0, idx)
2063 bracketed_names = [escape(column) for column in names]
2064 col_names = ",".join(bracketed_names)
2066 row_wildcards = ",".join([wld] * len(names))
2067 wildcards = ",".join([f"({row_wildcards})" for _ in range(num_rows)])
2068 insert_statement = (
2069 f"INSERT INTO {escape(self.name)} ({col_names}) VALUES {wildcards}"
2070 )
2071 return insert_statement
2073 def _execute_insert(self, conn, keys, data_iter) -> int:
2074 data_list = list(data_iter)
2075 conn.executemany(self.insert_statement(num_rows=1), data_list)
2076 return conn.rowcount
2078 def _execute_insert_multi(self, conn, keys, data_iter) -> int:
2079 data_list = list(data_iter)
2080 flattened_data = [x for row in data_list for x in row]
2081 conn.execute(self.insert_statement(num_rows=len(data_list)), flattened_data)
2082 return conn.rowcount
2084 def _create_table_setup(self):
2085 """
2086 Return a list of SQL statements that creates a table reflecting the
2087 structure of a DataFrame. The first entry will be a CREATE TABLE
2088 statement while the rest will be CREATE INDEX statements.
2089 """
2090 column_names_and_types = self._get_column_names_and_types(self._sql_type_name)
2091 escape = _get_valid_sqlite_name
2093 create_tbl_stmts = [
2094 escape(cname) + " " + ctype for cname, ctype, _ in column_names_and_types
2095 ]
2097 if self.keys is not None and len(self.keys):
2098 if not is_list_like(self.keys):
2099 keys = [self.keys]
2100 else:
2101 keys = self.keys
2102 cnames_br = ", ".join([escape(c) for c in keys])
2103 create_tbl_stmts.append(
2104 f"CONSTRAINT {self.name}_pk PRIMARY KEY ({cnames_br})"
2105 )
2106 if self.schema:
2107 schema_name = self.schema + "."
2108 else:
2109 schema_name = ""
2110 create_stmts = [
2111 "CREATE TABLE "
2112 + schema_name
2113 + escape(self.name)
2114 + " (\n"
2115 + ",\n ".join(create_tbl_stmts)
2116 + "\n)"
2117 ]
2119 ix_cols = [cname for cname, _, is_index in column_names_and_types if is_index]
2120 if len(ix_cols):
2121 cnames = "_".join(ix_cols)
2122 cnames_br = ",".join([escape(c) for c in ix_cols])
2123 create_stmts.append(
2124 "CREATE INDEX "
2125 + escape("ix_" + self.name + "_" + cnames)
2126 + "ON "
2127 + escape(self.name)
2128 + " ("
2129 + cnames_br
2130 + ")"
2131 )
2133 return create_stmts
2135 def _sql_type_name(self, col):
2136 dtype: DtypeArg = self.dtype or {}
2137 if is_dict_like(dtype):
2138 dtype = cast(dict, dtype)
2139 if col.name in dtype:
2140 return dtype[col.name]
2142 # Infer type of column, while ignoring missing values.
2143 # Needed for inserting typed data containing NULLs, GH 8778.
2144 col_type = lib.infer_dtype(col, skipna=True)
2146 if col_type == "timedelta64":
2147 warnings.warn(
2148 "the 'timedelta' type is not supported, and will be "
2149 "written as integer values (ns frequency) to the database.",
2150 UserWarning,
2151 stacklevel=find_stack_level(),
2152 )
2153 col_type = "integer"
2155 elif col_type == "datetime64":
2156 col_type = "datetime"
2158 elif col_type == "empty":
2159 col_type = "string"
2161 elif col_type == "complex":
2162 raise ValueError("Complex datatypes not supported")
2164 if col_type not in _SQL_TYPES:
2165 col_type = "string"
2167 return _SQL_TYPES[col_type]
2170class SQLiteDatabase(PandasSQL):
2171 """
2172 Version of SQLDatabase to support SQLite connections (fallback without
2173 SQLAlchemy). This should only be used internally.
2175 Parameters
2176 ----------
2177 con : sqlite connection object
2179 """
2181 def __init__(self, con) -> None:
2182 self.con = con
2184 @contextmanager
2185 def run_transaction(self):
2186 cur = self.con.cursor()
2187 try:
2188 yield cur
2189 self.con.commit()
2190 except Exception:
2191 self.con.rollback()
2192 raise
2193 finally:
2194 cur.close()
2196 def execute(self, sql: str | Select | TextClause, params=None):
2197 if not isinstance(sql, str):
2198 raise TypeError("Query must be a string unless using sqlalchemy.")
2199 args = [] if params is None else [params]
2200 cur = self.con.cursor()
2201 try:
2202 cur.execute(sql, *args)
2203 return cur
2204 except Exception as exc:
2205 try:
2206 self.con.rollback()
2207 except Exception as inner_exc: # pragma: no cover
2208 ex = DatabaseError(
2209 f"Execution failed on sql: {sql}\n{exc}\nunable to rollback"
2210 )
2211 raise ex from inner_exc
2213 ex = DatabaseError(f"Execution failed on sql '{sql}': {exc}")
2214 raise ex from exc
2216 @staticmethod
2217 def _query_iterator(
2218 cursor,
2219 chunksize: int,
2220 columns,
2221 index_col=None,
2222 coerce_float: bool = True,
2223 parse_dates=None,
2224 dtype: DtypeArg | None = None,
2225 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
2226 ):
2227 """Return generator through chunked result set"""
2228 has_read_data = False
2229 while True:
2230 data = cursor.fetchmany(chunksize)
2231 if type(data) == tuple:
2232 data = list(data)
2233 if not data:
2234 cursor.close()
2235 if not has_read_data:
2236 result = DataFrame.from_records(
2237 [], columns=columns, coerce_float=coerce_float
2238 )
2239 if dtype:
2240 result = result.astype(dtype)
2241 yield result
2242 break
2244 has_read_data = True
2245 yield _wrap_result(
2246 data,
2247 columns,
2248 index_col=index_col,
2249 coerce_float=coerce_float,
2250 parse_dates=parse_dates,
2251 dtype=dtype,
2252 dtype_backend=dtype_backend,
2253 )
2255 def read_query(
2256 self,
2257 sql,
2258 index_col=None,
2259 coerce_float: bool = True,
2260 parse_dates=None,
2261 params=None,
2262 chunksize: int | None = None,
2263 dtype: DtypeArg | None = None,
2264 dtype_backend: DtypeBackend | Literal["numpy"] = "numpy",
2265 ) -> DataFrame | Iterator[DataFrame]:
2266 cursor = self.execute(sql, params)
2267 columns = [col_desc[0] for col_desc in cursor.description]
2269 if chunksize is not None:
2270 return self._query_iterator(
2271 cursor,
2272 chunksize,
2273 columns,
2274 index_col=index_col,
2275 coerce_float=coerce_float,
2276 parse_dates=parse_dates,
2277 dtype=dtype,
2278 dtype_backend=dtype_backend,
2279 )
2280 else:
2281 data = self._fetchall_as_list(cursor)
2282 cursor.close()
2284 frame = _wrap_result(
2285 data,
2286 columns,
2287 index_col=index_col,
2288 coerce_float=coerce_float,
2289 parse_dates=parse_dates,
2290 dtype=dtype,
2291 dtype_backend=dtype_backend,
2292 )
2293 return frame
2295 def _fetchall_as_list(self, cur):
2296 result = cur.fetchall()
2297 if not isinstance(result, list):
2298 result = list(result)
2299 return result
2301 def to_sql(
2302 self,
2303 frame,
2304 name,
2305 if_exists: str = "fail",
2306 index: bool = True,
2307 index_label=None,
2308 schema=None,
2309 chunksize=None,
2310 dtype: DtypeArg | None = None,
2311 method=None,
2312 engine: str = "auto",
2313 **engine_kwargs,
2314 ) -> int | None:
2315 """
2316 Write records stored in a DataFrame to a SQL database.
2318 Parameters
2319 ----------
2320 frame: DataFrame
2321 name: string
2322 Name of SQL table.
2323 if_exists: {'fail', 'replace', 'append'}, default 'fail'
2324 fail: If table exists, do nothing.
2325 replace: If table exists, drop it, recreate it, and insert data.
2326 append: If table exists, insert data. Create if it does not exist.
2327 index : bool, default True
2328 Write DataFrame index as a column
2329 index_label : string or sequence, default None
2330 Column label for index column(s). If None is given (default) and
2331 `index` is True, then the index names are used.
2332 A sequence should be given if the DataFrame uses MultiIndex.
2333 schema : string, default None
2334 Ignored parameter included for compatibility with SQLAlchemy
2335 version of ``to_sql``.
2336 chunksize : int, default None
2337 If not None, then rows will be written in batches of this
2338 size at a time. If None, all rows will be written at once.
2339 dtype : single type or dict of column name to SQL type, default None
2340 Optional specifying the datatype for columns. The SQL type should
2341 be a string. If all columns are of the same type, one single value
2342 can be used.
2343 method : {None, 'multi', callable}, default None
2344 Controls the SQL insertion clause used:
2346 * None : Uses standard SQL ``INSERT`` clause (one per row).
2347 * 'multi': Pass multiple values in a single ``INSERT`` clause.
2348 * callable with signature ``(pd_table, conn, keys, data_iter)``.
2350 Details and a sample callable implementation can be found in the
2351 section :ref:`insert method <io.sql.method>`.
2352 """
2353 if dtype:
2354 if not is_dict_like(dtype):
2355 # error: Value expression in dictionary comprehension has incompatible
2356 # type "Union[ExtensionDtype, str, dtype[Any], Type[object],
2357 # Dict[Hashable, Union[ExtensionDtype, Union[str, dtype[Any]],
2358 # Type[str], Type[float], Type[int], Type[complex], Type[bool],
2359 # Type[object]]]]"; expected type "Union[ExtensionDtype, str,
2360 # dtype[Any], Type[object]]"
2361 dtype = {col_name: dtype for col_name in frame} # type: ignore[misc]
2362 else:
2363 dtype = cast(dict, dtype)
2365 for col, my_type in dtype.items():
2366 if not isinstance(my_type, str):
2367 raise ValueError(f"{col} ({my_type}) not a string")
2369 table = SQLiteTable(
2370 name,
2371 self,
2372 frame=frame,
2373 index=index,
2374 if_exists=if_exists,
2375 index_label=index_label,
2376 dtype=dtype,
2377 )
2378 table.create()
2379 return table.insert(chunksize, method)
2381 def has_table(self, name: str, schema: str | None = None) -> bool:
2382 wld = "?"
2383 query = f"SELECT name FROM sqlite_master WHERE type='table' AND name={wld};"
2385 return len(self.execute(query, [name]).fetchall()) > 0
2387 def get_table(self, table_name: str, schema: str | None = None) -> None:
2388 return None # not supported in fallback mode
2390 def drop_table(self, name: str, schema: str | None = None) -> None:
2391 drop_sql = f"DROP TABLE {_get_valid_sqlite_name(name)}"
2392 self.execute(drop_sql)
2394 def _create_sql_schema(
2395 self,
2396 frame,
2397 table_name: str,
2398 keys=None,
2399 dtype: DtypeArg | None = None,
2400 schema: str | None = None,
2401 ):
2402 table = SQLiteTable(
2403 table_name,
2404 self,
2405 frame=frame,
2406 index=False,
2407 keys=keys,
2408 dtype=dtype,
2409 schema=schema,
2410 )
2411 return str(table.sql_schema())
2414def get_schema(
2415 frame,
2416 name: str,
2417 keys=None,
2418 con=None,
2419 dtype: DtypeArg | None = None,
2420 schema: str | None = None,
2421) -> str:
2422 """
2423 Get the SQL db table schema for the given frame.
2425 Parameters
2426 ----------
2427 frame : DataFrame
2428 name : str
2429 name of SQL table
2430 keys : string or sequence, default: None
2431 columns to use a primary key
2432 con: an open SQL database connection object or a SQLAlchemy connectable
2433 Using SQLAlchemy makes it possible to use any DB supported by that
2434 library, default: None
2435 If a DBAPI2 object, only sqlite3 is supported.
2436 dtype : dict of column name to SQL type, default None
2437 Optional specifying the datatype for columns. The SQL type should
2438 be a SQLAlchemy type, or a string for sqlite3 fallback connection.
2439 schema: str, default: None
2440 Optional specifying the schema to be used in creating the table.
2442 .. versionadded:: 1.2.0
2443 """
2444 with pandasSQL_builder(con=con) as pandas_sql:
2445 return pandas_sql._create_sql_schema(
2446 frame, name, keys=keys, dtype=dtype, schema=schema
2447 )