Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiosqlite/cursor.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

72 statements  

1# Copyright Amethyst Reese 

2# Licensed under the MIT license 

3 

4import sqlite3 

5from collections.abc import AsyncIterator, Iterable 

6from typing import Any, Callable, Optional, TYPE_CHECKING 

7 

8if TYPE_CHECKING: 

9 from .core import Connection 

10 

11 

12class Cursor: 

13 def __init__(self, conn: "Connection", cursor: sqlite3.Cursor) -> None: 

14 self.iter_chunk_size = conn._iter_chunk_size 

15 self._conn = conn 

16 self._cursor = cursor 

17 

18 def __aiter__(self) -> AsyncIterator[sqlite3.Row]: 

19 """The cursor proxy is also an async iterator.""" 

20 return self._fetch_chunked() 

21 

22 async def _fetch_chunked(self): 

23 while True: 

24 rows = await self.fetchmany(self.iter_chunk_size) 

25 if not rows: 

26 return 

27 for row in rows: 

28 yield row 

29 

30 async def _execute(self, fn, *args, **kwargs): 

31 """Execute the given function on the shared connection's thread.""" 

32 return await self._conn._execute(fn, *args, **kwargs) 

33 

34 async def execute( 

35 self, sql: str, parameters: Optional[Iterable[Any]] = None 

36 ) -> "Cursor": 

37 """Execute the given query.""" 

38 if parameters is None: 

39 parameters = [] 

40 await self._execute(self._cursor.execute, sql, parameters) 

41 return self 

42 

43 async def executemany( 

44 self, sql: str, parameters: Iterable[Iterable[Any]] 

45 ) -> "Cursor": 

46 """Execute the given multiquery.""" 

47 await self._execute(self._cursor.executemany, sql, parameters) 

48 return self 

49 

50 async def executescript(self, sql_script: str) -> "Cursor": 

51 """Execute a user script.""" 

52 await self._execute(self._cursor.executescript, sql_script) 

53 return self 

54 

55 async def fetchone(self) -> Optional[sqlite3.Row]: 

56 """Fetch a single row.""" 

57 return await self._execute(self._cursor.fetchone) 

58 

59 async def fetchmany(self, size: Optional[int] = None) -> Iterable[sqlite3.Row]: 

60 """Fetch up to `cursor.arraysize` number of rows.""" 

61 args: tuple[int, ...] = () 

62 if size is not None: 

63 args = (size,) 

64 return await self._execute(self._cursor.fetchmany, *args) 

65 

66 async def fetchall(self) -> Iterable[sqlite3.Row]: 

67 """Fetch all remaining rows.""" 

68 return await self._execute(self._cursor.fetchall) 

69 

70 async def close(self) -> None: 

71 """Close the cursor.""" 

72 await self._execute(self._cursor.close) 

73 

74 @property 

75 def rowcount(self) -> int: 

76 return self._cursor.rowcount 

77 

78 @property 

79 def lastrowid(self) -> Optional[int]: 

80 return self._cursor.lastrowid 

81 

82 @property 

83 def arraysize(self) -> int: 

84 return self._cursor.arraysize 

85 

86 @arraysize.setter 

87 def arraysize(self, value: int) -> None: 

88 self._cursor.arraysize = value 

89 

90 @property 

91 def description(self) -> tuple[tuple[str, None, None, None, None, None, None], ...]: 

92 return self._cursor.description 

93 

94 @property 

95 def row_factory(self) -> Optional[Callable[[sqlite3.Cursor, sqlite3.Row], object]]: 

96 return self._cursor.row_factory 

97 

98 @row_factory.setter 

99 def row_factory(self, factory: Optional[type]) -> None: 

100 self._cursor.row_factory = factory 

101 

102 @property 

103 def connection(self) -> sqlite3.Connection: 

104 return self._cursor.connection 

105 

106 async def __aenter__(self): 

107 return self 

108 

109 async def __aexit__(self, exc_type, exc_val, exc_tb): 

110 await self.close()