Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py: 32%

299 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# postgresql/psycopg2.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7r""" 

8.. dialect:: postgresql+psycopg2 

9 :name: psycopg2 

10 :dbapi: psycopg2 

11 :connectstring: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...] 

12 :url: https://pypi.org/project/psycopg2/ 

13 

14psycopg2 Connect Arguments 

15-------------------------- 

16 

17Keyword arguments that are specific to the SQLAlchemy psycopg2 dialect 

18may be passed to :func:`_sa.create_engine()`, and include the following: 

19 

20 

21* ``isolation_level``: This option, available for all PostgreSQL dialects, 

22 includes the ``AUTOCOMMIT`` isolation level when using the psycopg2 

23 dialect. This option sets the **default** isolation level for the 

24 connection that is set immediately upon connection to the database before 

25 the connection is pooled. This option is generally superseded by the more 

26 modern :paramref:`_engine.Connection.execution_options.isolation_level` 

27 execution option, detailed at :ref:`dbapi_autocommit`. 

28 

29 .. seealso:: 

30 

31 :ref:`psycopg2_isolation_level` 

32 

33 :ref:`dbapi_autocommit` 

34 

35 

36* ``client_encoding``: sets the client encoding in a libpq-agnostic way, 

37 using psycopg2's ``set_client_encoding()`` method. 

38 

39 .. seealso:: 

40 

41 :ref:`psycopg2_unicode` 

42 

43* ``use_native_unicode``: Under Python 2 only, this can be set to False to 

44 disable the use of psycopg2's native Unicode support. 

45 

46 .. seealso:: 

47 

48 :ref:`psycopg2_disable_native_unicode` 

49 

50 

51* ``executemany_mode``, ``executemany_batch_page_size``, 

52 ``executemany_values_page_size``: Allows use of psycopg2 

53 extensions for optimizing "executemany"-style queries. See the referenced 

54 section below for details. 

55 

56 .. seealso:: 

57 

58 :ref:`psycopg2_executemany_mode` 

59 

60.. tip:: 

61 

62 The above keyword arguments are **dialect** keyword arguments, meaning 

63 that they are passed as explicit keyword arguments to :func:`_sa.create_engine()`:: 

64 

65 engine = create_engine( 

66 "postgresql+psycopg2://scott:tiger@localhost/test", 

67 isolation_level="SERIALIZABLE", 

68 ) 

69 

70 These should not be confused with **DBAPI** connect arguments, which 

71 are passed as part of the :paramref:`_sa.create_engine.connect_args` 

72 dictionary and/or are passed in the URL query string, as detailed in 

73 the section :ref:`custom_dbapi_args`. 

74 

75.. _psycopg2_ssl: 

76 

77SSL Connections 

78--------------- 

79 

80The psycopg2 module has a connection argument named ``sslmode`` for 

81controlling its behavior regarding secure (SSL) connections. The default is 

82``sslmode=prefer``; it will attempt an SSL connection and if that fails it 

83will fall back to an unencrypted connection. ``sslmode=require`` may be used 

84to ensure that only secure connections are established. Consult the 

85psycopg2 / libpq documentation for further options that are available. 

86 

87Note that ``sslmode`` is specific to psycopg2 so it is included in the 

88connection URI:: 

89 

90 engine = sa.create_engine( 

91 "postgresql+psycopg2://scott:tiger@192.168.0.199:5432/test?sslmode=require" 

92 ) 

93 

94Unix Domain Connections 

95------------------------ 

96 

97psycopg2 supports connecting via Unix domain connections. When the ``host`` 

98portion of the URL is omitted, SQLAlchemy passes ``None`` to psycopg2, 

99which specifies Unix-domain communication rather than TCP/IP communication:: 

100 

101 create_engine("postgresql+psycopg2://user:password@/dbname") 

102 

103By default, the socket file used is to connect to a Unix-domain socket 

104in ``/tmp``, or whatever socket directory was specified when PostgreSQL 

105was built. This value can be overridden by passing a pathname to psycopg2, 

106using ``host`` as an additional keyword argument:: 

107 

108 create_engine("postgresql+psycopg2://user:password@/dbname?host=/var/lib/postgresql") 

109 

110.. seealso:: 

111 

112 `PQconnectdbParams \ 

113 <https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PQCONNECTDBPARAMS>`_ 

114 

115.. _psycopg2_multi_host: 

116 

117Specifying multiple fallback hosts 

118----------------------------------- 

119 

120psycopg2 supports multiple connection points in the connection string. 

121When the ``host`` parameter is used multiple times in the query section of 

122the URL, SQLAlchemy will create a single string of the host and port 

123information provided to make the connections. Tokens may consist of 

124``host::port`` or just ``host``; in the latter case, the default port 

125is selected by libpq. In the example below, three host connections 

126are specified, for ``HostA::PortA``, ``HostB`` connecting to the default port, 

127and ``HostC::PortC``:: 

128 

129 create_engine( 

130 "postgresql+psycopg2://user:password@/dbname?host=HostA:PortA&host=HostB&host=HostC:PortC" 

131 ) 

132 

133As an alternative, libpq query string format also may be used; this specifies 

134``host`` and ``port`` as single query string arguments with comma-separated 

135lists - the default port can be chosen by indicating an empty value 

136in the comma separated list:: 

137 

138 create_engine( 

139 "postgresql+psycopg2://user:password@/dbname?host=HostA,HostB,HostC&port=PortA,,PortC" 

140 ) 

141 

142With either URL style, connections to each host is attempted based on a 

143configurable strategy, which may be configured using the libpq 

144``target_session_attrs`` parameter. Per libpq this defaults to ``any`` 

145which indicates a connection to each host is then attempted until a connection is successful. 

146Other strategies include ``primary``, ``prefer-standby``, etc. The complete 

147list is documented by PostgreSQL at 

148`libpq connection strings <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_. 

149 

150For example, to indicate two hosts using the ``primary`` strategy:: 

151 

152 create_engine( 

153 "postgresql+psycopg2://user:password@/dbname?host=HostA:PortA&host=HostB&host=HostC:PortC&target_session_attrs=primary" 

154 ) 

155 

156.. versionchanged:: 1.4.40 Port specification in psycopg2 multiple host format 

157 is repaired, previously ports were not correctly interpreted in this context. 

158 libpq comma-separated format is also now supported. 

159 

160.. versionadded:: 1.3.20 Support for multiple hosts in PostgreSQL connection 

161 string. 

162 

163.. seealso:: 

164 

165 `libpq connection strings <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_ - please refer 

166 to this section in the libpq documentation for complete background on multiple host support. 

167 

168 

169Empty DSN Connections / Environment Variable Connections 

170--------------------------------------------------------- 

171 

172The psycopg2 DBAPI can connect to PostgreSQL by passing an empty DSN to the 

173libpq client library, which by default indicates to connect to a localhost 

174PostgreSQL database that is open for "trust" connections. This behavior can be 

175further tailored using a particular set of environment variables which are 

176prefixed with ``PG_...``, which are consumed by ``libpq`` to take the place of 

177any or all elements of the connection string. 

178 

179For this form, the URL can be passed without any elements other than the 

180initial scheme:: 

181 

182 engine = create_engine('postgresql+psycopg2://') 

183 

184In the above form, a blank "dsn" string is passed to the ``psycopg2.connect()`` 

185function which in turn represents an empty DSN passed to libpq. 

186 

187.. versionadded:: 1.3.2 support for parameter-less connections with psycopg2. 

188 

189.. seealso:: 

190 

191 `Environment Variables\ 

192 <https://www.postgresql.org/docs/current/libpq-envars.html>`_ - 

193 PostgreSQL documentation on how to use ``PG_...`` 

194 environment variables for connections. 

195 

196.. _psycopg2_execution_options: 

197 

198Per-Statement/Connection Execution Options 

199------------------------------------------- 

200 

201The following DBAPI-specific options are respected when used with 

202:meth:`_engine.Connection.execution_options`, 

203:meth:`.Executable.execution_options`, 

204:meth:`_query.Query.execution_options`, 

205in addition to those not specific to DBAPIs: 

206 

207* ``isolation_level`` - Set the transaction isolation level for the lifespan 

208 of a :class:`_engine.Connection` (can only be set on a connection, 

209 not a statement 

210 or query). See :ref:`psycopg2_isolation_level`. 

211 

212* ``stream_results`` - Enable or disable usage of psycopg2 server side 

213 cursors - this feature makes use of "named" cursors in combination with 

214 special result handling methods so that result rows are not fully buffered. 

215 Defaults to False, meaning cursors are buffered by default. 

216 

217* ``max_row_buffer`` - when using ``stream_results``, an integer value that 

218 specifies the maximum number of rows to buffer at a time. This is 

219 interpreted by the :class:`.BufferedRowCursorResult`, and if omitted the 

220 buffer will grow to ultimately store 1000 rows at a time. 

221 

222 .. versionchanged:: 1.4 The ``max_row_buffer`` size can now be greater than 

223 1000, and the buffer will grow to that size. 

224 

225.. _psycopg2_batch_mode: 

226 

227.. _psycopg2_executemany_mode: 

228 

229Psycopg2 Fast Execution Helpers 

230------------------------------- 

231 

232Modern versions of psycopg2 include a feature known as 

233`Fast Execution Helpers \ 

234<https://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which 

235have been shown in benchmarking to improve psycopg2's executemany() 

236performance, primarily with INSERT statements, by multiple orders of magnitude. 

237SQLAlchemy internally makes use of these extensions for ``executemany()`` style 

238calls, which correspond to lists of parameters being passed to 

239:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter 

240sets <tutorial_multiple_parameters>`. The ORM also uses this mode internally whenever 

241possible. 

242 

243The two available extensions on the psycopg2 side are the ``execute_values()`` 

244and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the 

245``execute_values()`` extension for all qualifying INSERT statements. 

246 

247.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode 

248 ``"values_only"`` for ``executemany_mode``, which allows an order of 

249 magnitude performance improvement for INSERT statements, but does not 

250 include "batch" mode for UPDATE and DELETE statements which removes the 

251 ability of ``cursor.rowcount`` to function correctly. 

252 

253The use of these extensions is controlled by the ``executemany_mode`` flag 

254which may be passed to :func:`_sa.create_engine`:: 

255 

256 engine = create_engine( 

257 "postgresql+psycopg2://scott:tiger@host/dbname", 

258 executemany_mode='values_plus_batch') 

259 

260 

261Possible options for ``executemany_mode`` include: 

262 

263* ``values_only`` - this is the default value. the psycopg2 execute_values() 

264 extension is used for qualifying INSERT statements, which rewrites the INSERT 

265 to include multiple VALUES clauses so that many parameter sets can be 

266 inserted with one statement. 

267 

268 .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode`` 

269 which is also now the default. 

270 

271* ``None`` - No psycopg2 extensions are not used, and the usual 

272 ``cursor.executemany()`` method is used when invoking statements with 

273 multiple parameter sets. 

274 

275* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying 

276 INSERT, UPDATE and DELETE statements, so that multiple copies 

277 of a SQL query, each one corresponding to a parameter set passed to 

278 ``executemany()``, are joined into a single SQL string separated by a 

279 semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount` 

280 attribute will not contain a value for executemany-style executions. 

281 

282* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT 

283 statements, ``execute_batch`` is used for UPDATE and DELETE. 

284 When using this mode, the :attr:`_engine.CursorResult.rowcount` 

285 attribute will not contain a value for executemany-style executions against 

286 UPDATE and DELETE statements. 

287 

288By "qualifying statements", we mean that the statement being executed 

289must be a Core :func:`_expression.insert`, :func:`_expression.update` 

290or :func:`_expression.delete` construct, and not a plain textual SQL 

291string or one constructed using :func:`_expression.text`. When using the 

292ORM, all insert/update/delete statements used by the ORM flush process 

293are qualifying. 

294 

295The "page size" for the "values" and "batch" strategies can be affected 

296by using the ``executemany_batch_page_size`` and 

297``executemany_values_page_size`` engine parameters. These 

298control how many parameter sets 

299should be represented in each execution. The "values" page size defaults 

300to 1000, which is different that psycopg2's default. The "batch" page 

301size defaults to 100. These can be affected by passing new values to 

302:func:`_engine.create_engine`:: 

303 

304 engine = create_engine( 

305 "postgresql+psycopg2://scott:tiger@host/dbname", 

306 executemany_mode='values', 

307 executemany_values_page_size=10000, executemany_batch_page_size=500) 

308 

309.. versionchanged:: 1.4 

310 

311 The default for ``executemany_values_page_size`` is now 1000, up from 

312 100. 

313 

314.. seealso:: 

315 

316 :ref:`tutorial_multiple_parameters` - General information on using the 

317 :class:`_engine.Connection` 

318 object to execute statements in such a way as to make 

319 use of the DBAPI ``.executemany()`` method. 

320 

321 

322.. _psycopg2_unicode: 

323 

324Unicode with Psycopg2 

325---------------------- 

326 

327The psycopg2 DBAPI driver supports Unicode data transparently. Under Python 2 

328only, the SQLAlchemy psycopg2 dialect will enable the 

329``psycopg2.extensions.UNICODE`` extension by default to ensure Unicode is 

330handled properly; under Python 3, this is psycopg2's default behavior. 

331 

332The client character encoding can be controlled for the psycopg2 dialect 

333in the following ways: 

334 

335* For PostgreSQL 9.1 and above, the ``client_encoding`` parameter may be 

336 passed in the database URL; this parameter is consumed by the underlying 

337 ``libpq`` PostgreSQL client library:: 

338 

339 engine = create_engine("postgresql+psycopg2://user:pass@host/dbname?client_encoding=utf8") 

340 

341 Alternatively, the above ``client_encoding`` value may be passed using 

342 :paramref:`_sa.create_engine.connect_args` for programmatic establishment with 

343 ``libpq``:: 

344 

345 engine = create_engine( 

346 "postgresql+psycopg2://user:pass@host/dbname", 

347 connect_args={'client_encoding': 'utf8'} 

348 ) 

349 

350* For all PostgreSQL versions, psycopg2 supports a client-side encoding 

351 value that will be passed to database connections when they are first 

352 established. The SQLAlchemy psycopg2 dialect supports this using the 

353 ``client_encoding`` parameter passed to :func:`_sa.create_engine`:: 

354 

355 engine = create_engine( 

356 "postgresql+psycopg2://user:pass@host/dbname", 

357 client_encoding="utf8" 

358 ) 

359 

360 .. tip:: The above ``client_encoding`` parameter admittedly is very similar 

361 in appearance to usage of the parameter within the 

362 :paramref:`_sa.create_engine.connect_args` dictionary; the difference 

363 above is that the parameter is consumed by psycopg2 and is 

364 passed to the database connection using ``SET client_encoding TO 

365 'utf8'``; in the previously mentioned style, the parameter is instead 

366 passed through psycopg2 and consumed by the ``libpq`` library. 

367 

368* A common way to set up client encoding with PostgreSQL databases is to 

369 ensure it is configured within the server-side postgresql.conf file; 

370 this is the recommended way to set encoding for a server that is 

371 consistently of one encoding in all databases:: 

372 

373 # postgresql.conf file 

374 

375 # client_encoding = sql_ascii # actually, defaults to database 

376 # encoding 

377 client_encoding = utf8 

378 

379.. _psycopg2_disable_native_unicode: 

380 

381Disabling Native Unicode 

382^^^^^^^^^^^^^^^^^^^^^^^^ 

383 

384Under Python 2 only, SQLAlchemy can also be instructed to skip the usage of the 

385psycopg2 ``UNICODE`` extension and to instead utilize its own unicode 

386encode/decode services, which are normally reserved only for those DBAPIs that 

387don't fully support unicode directly. Passing ``use_native_unicode=False`` to 

388:func:`_sa.create_engine` will disable usage of ``psycopg2.extensions. 

389UNICODE``. SQLAlchemy will instead encode data itself into Python bytestrings 

390on the way in and coerce from bytes on the way back, using the value of the 

391:func:`_sa.create_engine` ``encoding`` parameter, which defaults to ``utf-8``. 

392SQLAlchemy's own unicode encode/decode functionality is steadily becoming 

393obsolete as most DBAPIs now support unicode fully. 

394 

395 

396Transactions 

397------------ 

398 

399The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations. 

400 

401.. _psycopg2_isolation_level: 

402 

403Psycopg2 Transaction Isolation Level 

404------------------------------------- 

405 

406As discussed in :ref:`postgresql_isolation_level`, 

407all PostgreSQL dialects support setting of transaction isolation level 

408both via the ``isolation_level`` parameter passed to :func:`_sa.create_engine` 

409, 

410as well as the ``isolation_level`` argument used by 

411:meth:`_engine.Connection.execution_options`. When using the psycopg2 dialect 

412, these 

413options make use of psycopg2's ``set_isolation_level()`` connection method, 

414rather than emitting a PostgreSQL directive; this is because psycopg2's 

415API-level setting is always emitted at the start of each transaction in any 

416case. 

417 

418The psycopg2 dialect supports these constants for isolation level: 

419 

420* ``READ COMMITTED`` 

421* ``READ UNCOMMITTED`` 

422* ``REPEATABLE READ`` 

423* ``SERIALIZABLE`` 

424* ``AUTOCOMMIT`` 

425 

426.. seealso:: 

427 

428 :ref:`postgresql_isolation_level` 

429 

430 :ref:`pg8000_isolation_level` 

431 

432 

433NOTICE logging 

434--------------- 

435 

436The psycopg2 dialect will log PostgreSQL NOTICE messages 

437via the ``sqlalchemy.dialects.postgresql`` logger. When this logger 

438is set to the ``logging.INFO`` level, notice messages will be logged:: 

439 

440 import logging 

441 

442 logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO) 

443 

444Above, it is assumed that logging is configured externally. If this is not 

445the case, configuration such as ``logging.basicConfig()`` must be utilized:: 

446 

447 import logging 

448 

449 logging.basicConfig() # log messages to stdout 

450 logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO) 

451 

452.. seealso:: 

453 

454 `Logging HOWTO <https://docs.python.org/3/howto/logging.html>`_ - on the python.org website 

455 

456.. _psycopg2_hstore: 

457 

458HSTORE type 

459------------ 

460 

461The ``psycopg2`` DBAPI includes an extension to natively handle marshalling of 

462the HSTORE type. The SQLAlchemy psycopg2 dialect will enable this extension 

463by default when psycopg2 version 2.4 or greater is used, and 

464it is detected that the target database has the HSTORE type set up for use. 

465In other words, when the dialect makes the first 

466connection, a sequence like the following is performed: 

467 

4681. Request the available HSTORE oids using 

469 ``psycopg2.extras.HstoreAdapter.get_oids()``. 

470 If this function returns a list of HSTORE identifiers, we then determine 

471 that the ``HSTORE`` extension is present. 

472 This function is **skipped** if the version of psycopg2 installed is 

473 less than version 2.4. 

474 

4752. If the ``use_native_hstore`` flag is at its default of ``True``, and 

476 we've detected that ``HSTORE`` oids are available, the 

477 ``psycopg2.extensions.register_hstore()`` extension is invoked for all 

478 connections. 

479 

480The ``register_hstore()`` extension has the effect of **all Python 

481dictionaries being accepted as parameters regardless of the type of target 

482column in SQL**. The dictionaries are converted by this extension into a 

483textual HSTORE expression. If this behavior is not desired, disable the 

484use of the hstore extension by setting ``use_native_hstore`` to ``False`` as 

485follows:: 

486 

487 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test", 

488 use_native_hstore=False) 

489 

490The ``HSTORE`` type is **still supported** when the 

491``psycopg2.extensions.register_hstore()`` extension is not used. It merely 

492means that the coercion between Python dictionaries and the HSTORE 

493string format, on both the parameter side and the result side, will take 

494place within SQLAlchemy's own marshalling logic, and not that of ``psycopg2`` 

495which may be more performant. 

496 

497""" # noqa 

498from __future__ import absolute_import 

499 

500import decimal 

501import logging 

502import re 

503from uuid import UUID as _python_UUID 

504 

505from .array import ARRAY as PGARRAY 

506from .base import _ColonCast 

507from .base import _DECIMAL_TYPES 

508from .base import _FLOAT_TYPES 

509from .base import _INT_TYPES 

510from .base import ENUM 

511from .base import PGCompiler 

512from .base import PGDialect 

513from .base import PGExecutionContext 

514from .base import PGIdentifierPreparer 

515from .base import UUID 

516from .hstore import HSTORE 

517from .json import JSON 

518from .json import JSONB 

519from ... import exc 

520from ... import processors 

521from ... import types as sqltypes 

522from ... import util 

523from ...engine import cursor as _cursor 

524from ...util import collections_abc 

525 

526 

527logger = logging.getLogger("sqlalchemy.dialects.postgresql") 

528 

529 

530class _PGNumeric(sqltypes.Numeric): 

531 def bind_processor(self, dialect): 

532 return None 

533 

534 def result_processor(self, dialect, coltype): 

535 if self.asdecimal: 

536 if coltype in _FLOAT_TYPES: 

537 return processors.to_decimal_processor_factory( 

538 decimal.Decimal, self._effective_decimal_return_scale 

539 ) 

540 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

541 # pg8000 returns Decimal natively for 1700 

542 return None 

543 else: 

544 raise exc.InvalidRequestError( 

545 "Unknown PG numeric type: %d" % coltype 

546 ) 

547 else: 

548 if coltype in _FLOAT_TYPES: 

549 # pg8000 returns float natively for 701 

550 return None 

551 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

552 return processors.to_float 

553 else: 

554 raise exc.InvalidRequestError( 

555 "Unknown PG numeric type: %d" % coltype 

556 ) 

557 

558 

559class _PGEnum(ENUM): 

560 def result_processor(self, dialect, coltype): 

561 if util.py2k and self._expect_unicode is True: 

562 # for py2k, if the enum type needs unicode data (which is set up as 

563 # part of the Enum() constructor based on values passed as py2k 

564 # unicode objects) we have to use our own converters since 

565 # psycopg2's don't work, a rare exception to the "modern DBAPIs 

566 # support unicode everywhere" theme of deprecating 

567 # convert_unicode=True. Use the special "force_nocheck" directive 

568 # which forces unicode conversion to happen on the Python side 

569 # without an isinstance() check. in py3k psycopg2 does the right 

570 # thing automatically. 

571 self._expect_unicode = "force_nocheck" 

572 return super(_PGEnum, self).result_processor(dialect, coltype) 

573 

574 

575class _PGHStore(HSTORE): 

576 def bind_processor(self, dialect): 

577 if dialect._has_native_hstore: 

578 return None 

579 else: 

580 return super(_PGHStore, self).bind_processor(dialect) 

581 

582 def result_processor(self, dialect, coltype): 

583 if dialect._has_native_hstore: 

584 return None 

585 else: 

586 return super(_PGHStore, self).result_processor(dialect, coltype) 

587 

588 

589class _PGARRAY(PGARRAY): 

590 def bind_expression(self, bindvalue): 

591 return _ColonCast(bindvalue, self) 

592 

593 

594class _PGJSON(JSON): 

595 def result_processor(self, dialect, coltype): 

596 return None 

597 

598 

599class _PGJSONB(JSONB): 

600 def result_processor(self, dialect, coltype): 

601 return None 

602 

603 

604class _PGUUID(UUID): 

605 def bind_processor(self, dialect): 

606 if not self.as_uuid and dialect.use_native_uuid: 

607 

608 def process(value): 

609 if value is not None: 

610 value = _python_UUID(value) 

611 return value 

612 

613 return process 

614 

615 def result_processor(self, dialect, coltype): 

616 if not self.as_uuid and dialect.use_native_uuid: 

617 

618 def process(value): 

619 if value is not None: 

620 value = str(value) 

621 return value 

622 

623 return process 

624 

625 

626_server_side_id = util.counter() 

627 

628 

629class PGExecutionContext_psycopg2(PGExecutionContext): 

630 _psycopg2_fetched_rows = None 

631 

632 def create_server_side_cursor(self): 

633 # use server-side cursors: 

634 # https://lists.initd.org/pipermail/psycopg/2007-January/005251.html 

635 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) 

636 return self._dbapi_connection.cursor(ident) 

637 

638 def post_exec(self): 

639 if ( 

640 self._psycopg2_fetched_rows 

641 and self.compiled 

642 and self.compiled.returning 

643 ): 

644 # psycopg2 execute_values will provide for a real cursor where 

645 # cursor.description works correctly. however, it executes the 

646 # INSERT statement multiple times for multiple pages of rows, so 

647 # while this cursor also supports calling .fetchall() directly, in 

648 # order to get the list of all rows inserted across multiple pages, 

649 # we have to retrieve the aggregated list from the execute_values() 

650 # function directly. 

651 strat_cls = _cursor.FullyBufferedCursorFetchStrategy 

652 self.cursor_fetch_strategy = strat_cls( 

653 self.cursor, initial_buffer=self._psycopg2_fetched_rows 

654 ) 

655 self._log_notices(self.cursor) 

656 

657 def _log_notices(self, cursor): 

658 # check also that notices is an iterable, after it's already 

659 # established that we will be iterating through it. This is to get 

660 # around test suites such as SQLAlchemy's using a Mock object for 

661 # cursor 

662 if not cursor.connection.notices or not isinstance( 

663 cursor.connection.notices, collections_abc.Iterable 

664 ): 

665 return 

666 

667 for notice in cursor.connection.notices: 

668 # NOTICE messages have a 

669 # newline character at the end 

670 logger.info(notice.rstrip()) 

671 

672 cursor.connection.notices[:] = [] 

673 

674 

675class PGCompiler_psycopg2(PGCompiler): 

676 pass 

677 

678 

679class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer): 

680 pass 

681 

682 

683EXECUTEMANY_PLAIN = util.symbol("executemany_plain", canonical=0) 

684EXECUTEMANY_BATCH = util.symbol("executemany_batch", canonical=1) 

685EXECUTEMANY_VALUES = util.symbol("executemany_values", canonical=2) 

686EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol( 

687 "executemany_values_plus_batch", 

688 canonical=EXECUTEMANY_BATCH | EXECUTEMANY_VALUES, 

689) 

690 

691 

692class PGDialect_psycopg2(PGDialect): 

693 driver = "psycopg2" 

694 

695 supports_statement_cache = True 

696 

697 if util.py2k: 

698 # turn off supports_unicode_statements for Python 2. psycopg2 supports 

699 # unicode statements in Py2K. But! it does not support unicode *bound 

700 # parameter names* because it uses the Python "%" operator to 

701 # interpolate these into the string, and this fails. So for Py2K, we 

702 # have to use full-on encoding for statements and parameters before 

703 # passing to cursor.execute(). 

704 supports_unicode_statements = False 

705 

706 supports_server_side_cursors = True 

707 

708 default_paramstyle = "pyformat" 

709 # set to true based on psycopg2 version 

710 supports_sane_multi_rowcount = False 

711 execution_ctx_cls = PGExecutionContext_psycopg2 

712 statement_compiler = PGCompiler_psycopg2 

713 preparer = PGIdentifierPreparer_psycopg2 

714 psycopg2_version = (0, 0) 

715 

716 _has_native_hstore = True 

717 

718 engine_config_types = PGDialect.engine_config_types.union( 

719 {"use_native_unicode": util.asbool} 

720 ) 

721 

722 colspecs = util.update_copy( 

723 PGDialect.colspecs, 

724 { 

725 sqltypes.Numeric: _PGNumeric, 

726 ENUM: _PGEnum, # needs force_unicode 

727 sqltypes.Enum: _PGEnum, # needs force_unicode 

728 HSTORE: _PGHStore, 

729 JSON: _PGJSON, 

730 sqltypes.JSON: _PGJSON, 

731 JSONB: _PGJSONB, 

732 UUID: _PGUUID, 

733 sqltypes.ARRAY: _PGARRAY, 

734 }, 

735 ) 

736 

737 def __init__( 

738 self, 

739 use_native_unicode=True, 

740 client_encoding=None, 

741 use_native_hstore=True, 

742 use_native_uuid=True, 

743 executemany_mode="values_only", 

744 executemany_batch_page_size=100, 

745 executemany_values_page_size=1000, 

746 **kwargs 

747 ): 

748 PGDialect.__init__(self, **kwargs) 

749 self.use_native_unicode = use_native_unicode 

750 if not use_native_unicode and not util.py2k: 

751 raise exc.ArgumentError( 

752 "psycopg2 native_unicode mode is required under Python 3" 

753 ) 

754 if not use_native_hstore: 

755 self._has_native_hstore = False 

756 self.use_native_hstore = use_native_hstore 

757 self.use_native_uuid = use_native_uuid 

758 self.supports_unicode_binds = use_native_unicode 

759 self.client_encoding = client_encoding 

760 

761 # Parse executemany_mode argument, allowing it to be only one of the 

762 # symbol names 

763 self.executemany_mode = util.symbol.parse_user_argument( 

764 executemany_mode, 

765 { 

766 EXECUTEMANY_PLAIN: [None], 

767 EXECUTEMANY_BATCH: ["batch"], 

768 EXECUTEMANY_VALUES: ["values_only"], 

769 EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"], 

770 }, 

771 "executemany_mode", 

772 ) 

773 

774 if self.executemany_mode & EXECUTEMANY_VALUES: 

775 self.insert_executemany_returning = True 

776 

777 self.executemany_batch_page_size = executemany_batch_page_size 

778 self.executemany_values_page_size = executemany_values_page_size 

779 

780 if self.dbapi and hasattr(self.dbapi, "__version__"): 

781 m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__) 

782 if m: 

783 self.psycopg2_version = tuple( 

784 int(x) for x in m.group(1, 2, 3) if x is not None 

785 ) 

786 

787 if self.psycopg2_version < (2, 7): 

788 raise ImportError( 

789 "psycopg2 version 2.7 or higher is required." 

790 ) 

791 

792 def initialize(self, connection): 

793 super(PGDialect_psycopg2, self).initialize(connection) 

794 self._has_native_hstore = ( 

795 self.use_native_hstore 

796 and self._hstore_oids(connection.connection) is not None 

797 ) 

798 

799 # PGDialect.initialize() checks server version for <= 8.2 and sets 

800 # this flag to False if so 

801 if not self.full_returning: 

802 self.insert_executemany_returning = False 

803 self.executemany_mode = EXECUTEMANY_PLAIN 

804 

805 self.supports_sane_multi_rowcount = not ( 

806 self.executemany_mode & EXECUTEMANY_BATCH 

807 ) 

808 

809 @classmethod 

810 def dbapi(cls): 

811 import psycopg2 

812 

813 return psycopg2 

814 

815 @classmethod 

816 def _psycopg2_extensions(cls): 

817 from psycopg2 import extensions 

818 

819 return extensions 

820 

821 @classmethod 

822 def _psycopg2_extras(cls): 

823 from psycopg2 import extras 

824 

825 return extras 

826 

827 @util.memoized_property 

828 def _isolation_lookup(self): 

829 extensions = self._psycopg2_extensions() 

830 return { 

831 "AUTOCOMMIT": extensions.ISOLATION_LEVEL_AUTOCOMMIT, 

832 "READ COMMITTED": extensions.ISOLATION_LEVEL_READ_COMMITTED, 

833 "READ UNCOMMITTED": extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, 

834 "REPEATABLE READ": extensions.ISOLATION_LEVEL_REPEATABLE_READ, 

835 "SERIALIZABLE": extensions.ISOLATION_LEVEL_SERIALIZABLE, 

836 } 

837 

838 def set_isolation_level(self, connection, level): 

839 try: 

840 level = self._isolation_lookup[level.replace("_", " ")] 

841 except KeyError as err: 

842 util.raise_( 

843 exc.ArgumentError( 

844 "Invalid value '%s' for isolation_level. " 

845 "Valid isolation levels for %s are %s" 

846 % (level, self.name, ", ".join(self._isolation_lookup)) 

847 ), 

848 replace_context=err, 

849 ) 

850 

851 connection.set_isolation_level(level) 

852 

853 def set_readonly(self, connection, value): 

854 connection.readonly = value 

855 

856 def get_readonly(self, connection): 

857 return connection.readonly 

858 

859 def set_deferrable(self, connection, value): 

860 connection.deferrable = value 

861 

862 def get_deferrable(self, connection): 

863 return connection.deferrable 

864 

865 def do_ping(self, dbapi_connection): 

866 cursor = None 

867 before_autocommit = dbapi_connection.autocommit 

868 try: 

869 if not before_autocommit: 

870 dbapi_connection.autocommit = True 

871 cursor = dbapi_connection.cursor() 

872 try: 

873 cursor.execute(self._dialect_specific_select_one) 

874 finally: 

875 cursor.close() 

876 if not before_autocommit and not dbapi_connection.closed: 

877 dbapi_connection.autocommit = before_autocommit 

878 except self.dbapi.Error as err: 

879 if self.is_disconnect(err, dbapi_connection, cursor): 

880 return False 

881 else: 

882 raise 

883 else: 

884 return True 

885 

886 def on_connect(self): 

887 extras = self._psycopg2_extras() 

888 extensions = self._psycopg2_extensions() 

889 

890 fns = [] 

891 if self.client_encoding is not None: 

892 

893 def on_connect(conn): 

894 conn.set_client_encoding(self.client_encoding) 

895 

896 fns.append(on_connect) 

897 

898 if self.isolation_level is not None: 

899 

900 def on_connect(conn): 

901 self.set_isolation_level(conn, self.isolation_level) 

902 

903 fns.append(on_connect) 

904 

905 if self.dbapi and self.use_native_uuid: 

906 

907 def on_connect(conn): 

908 extras.register_uuid(None, conn) 

909 

910 fns.append(on_connect) 

911 

912 if util.py2k and self.dbapi and self.use_native_unicode: 

913 

914 def on_connect(conn): 

915 extensions.register_type(extensions.UNICODE, conn) 

916 extensions.register_type(extensions.UNICODEARRAY, conn) 

917 

918 fns.append(on_connect) 

919 

920 if self.dbapi and self.use_native_hstore: 

921 

922 def on_connect(conn): 

923 hstore_oids = self._hstore_oids(conn) 

924 if hstore_oids is not None: 

925 oid, array_oid = hstore_oids 

926 kw = {"oid": oid} 

927 if util.py2k: 

928 kw["unicode"] = True 

929 kw["array_oid"] = array_oid 

930 extras.register_hstore(conn, **kw) 

931 

932 fns.append(on_connect) 

933 

934 if self.dbapi and self._json_deserializer: 

935 

936 def on_connect(conn): 

937 extras.register_default_json( 

938 conn, loads=self._json_deserializer 

939 ) 

940 extras.register_default_jsonb( 

941 conn, loads=self._json_deserializer 

942 ) 

943 

944 fns.append(on_connect) 

945 

946 if fns: 

947 

948 def on_connect(conn): 

949 for fn in fns: 

950 fn(conn) 

951 

952 return on_connect 

953 else: 

954 return None 

955 

956 def do_executemany(self, cursor, statement, parameters, context=None): 

957 if ( 

958 self.executemany_mode & EXECUTEMANY_VALUES 

959 and context 

960 and context.isinsert 

961 and context.compiled._is_safe_for_fast_insert_values_helper 

962 ): 

963 executemany_values = ( 

964 "(%s)" % context.compiled.insert_single_values_expr 

965 ) 

966 if not self.supports_unicode_statements: 

967 executemany_values = executemany_values.encode(self.encoding) 

968 

969 # guard for statement that was altered via event hook or similar 

970 if executemany_values not in statement: 

971 executemany_values = None 

972 else: 

973 executemany_values = None 

974 

975 if executemany_values: 

976 statement = statement.replace(executemany_values, "%s") 

977 if self.executemany_values_page_size: 

978 kwargs = {"page_size": self.executemany_values_page_size} 

979 else: 

980 kwargs = {} 

981 xtras = self._psycopg2_extras() 

982 context._psycopg2_fetched_rows = xtras.execute_values( 

983 cursor, 

984 statement, 

985 parameters, 

986 template=executemany_values, 

987 fetch=bool(context.compiled.returning), 

988 **kwargs 

989 ) 

990 

991 elif self.executemany_mode & EXECUTEMANY_BATCH: 

992 if self.executemany_batch_page_size: 

993 kwargs = {"page_size": self.executemany_batch_page_size} 

994 else: 

995 kwargs = {} 

996 self._psycopg2_extras().execute_batch( 

997 cursor, statement, parameters, **kwargs 

998 ) 

999 else: 

1000 cursor.executemany(statement, parameters) 

1001 

1002 @util.memoized_instancemethod 

1003 def _hstore_oids(self, conn): 

1004 extras = self._psycopg2_extras() 

1005 if hasattr(conn, "dbapi_connection"): 

1006 conn = conn.dbapi_connection 

1007 oids = extras.HstoreAdapter.get_oids(conn) 

1008 if oids is not None and oids[0]: 

1009 return oids[0:2] 

1010 else: 

1011 return None 

1012 

1013 def create_connect_args(self, url): 

1014 opts = url.translate_connect_args(username="user") 

1015 

1016 is_multihost = False 

1017 if "host" in url.query: 

1018 is_multihost = isinstance(url.query["host"], (list, tuple)) 

1019 

1020 if opts or url.query: 

1021 if not opts: 

1022 opts = {} 

1023 if "port" in opts: 

1024 opts["port"] = int(opts["port"]) 

1025 opts.update(url.query) 

1026 if is_multihost: 

1027 hosts, ports = zip( 

1028 *[ 

1029 token.split(":") if ":" in token else (token, "") 

1030 for token in url.query["host"] 

1031 ] 

1032 ) 

1033 opts["host"] = ",".join(hosts) 

1034 if "port" in opts: 

1035 raise exc.ArgumentError( 

1036 "Can't mix 'multihost' formats together; use " 

1037 '"host=h1,h2,h3&port=p1,p2,p3" or ' 

1038 '"host=h1:p1&host=h2:p2&host=h3:p3" separately' 

1039 ) 

1040 opts["port"] = ",".join(ports) 

1041 return ([], opts) 

1042 else: 

1043 # no connection arguments whatsoever; psycopg2.connect() 

1044 # requires that "dsn" be present as a blank string. 

1045 return ([""], opts) 

1046 

1047 def is_disconnect(self, e, connection, cursor): 

1048 if isinstance(e, self.dbapi.Error): 

1049 # check the "closed" flag. this might not be 

1050 # present on old psycopg2 versions. Also, 

1051 # this flag doesn't actually help in a lot of disconnect 

1052 # situations, so don't rely on it. 

1053 if getattr(connection, "closed", False): 

1054 return True 

1055 

1056 # checks based on strings. in the case that .closed 

1057 # didn't cut it, fall back onto these. 

1058 str_e = str(e).partition("\n")[0] 

1059 for msg in [ 

1060 # these error messages from libpq: interfaces/libpq/fe-misc.c 

1061 # and interfaces/libpq/fe-secure.c. 

1062 "terminating connection", 

1063 "closed the connection", 

1064 "connection not open", 

1065 "could not receive data from server", 

1066 "could not send data to server", 

1067 # psycopg2 client errors, psycopg2/connection.h, 

1068 # psycopg2/cursor.h 

1069 "connection already closed", 

1070 "cursor already closed", 

1071 # not sure where this path is originally from, it may 

1072 # be obsolete. It really says "losed", not "closed". 

1073 "losed the connection unexpectedly", 

1074 # these can occur in newer SSL 

1075 "connection has been closed unexpectedly", 

1076 "SSL error: decryption failed or bad record mac", 

1077 "SSL SYSCALL error: Bad file descriptor", 

1078 "SSL SYSCALL error: EOF detected", 

1079 "SSL SYSCALL error: Operation timed out", 

1080 "SSL SYSCALL error: Bad address", 

1081 ]: 

1082 idx = str_e.find(msg) 

1083 if idx >= 0 and '"' not in str_e[:idx]: 

1084 return True 

1085 return False 

1086 

1087 

1088dialect = PGDialect_psycopg2