Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/history.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

467 statements  

1"""History related magics and functionality""" 

2 

3from __future__ import annotations 

4 

5# Copyright (c) IPython Development Team. 

6# Distributed under the terms of the Modified BSD License. 

7 

8 

9import atexit 

10import datetime 

11import re 

12 

13 

14import threading 

15from pathlib import Path 

16 

17from collections import defaultdict 

18from contextlib import contextmanager 

19from dataclasses import dataclass 

20from decorator import decorator 

21from traitlets import ( 

22 Any, 

23 Bool, 

24 Dict, 

25 Instance, 

26 Integer, 

27 List, 

28 TraitError, 

29 Unicode, 

30 Union, 

31 default, 

32 observe, 

33) 

34from traitlets.config.configurable import LoggingConfigurable 

35 

36from IPython.paths import locate_profile 

37from IPython.utils.decorators import undoc 

38from typing import Iterable, Tuple, Optional, TYPE_CHECKING 

39import typing 

40from warnings import warn 

41from weakref import ref, WeakSet 

42 

43if TYPE_CHECKING: 

44 from IPython.core.interactiveshell import InteractiveShell 

45 from IPython.config.Configuration import Configuration 

46 

47try: 

48 from sqlite3 import DatabaseError, OperationalError 

49 import sqlite3 

50 

51 sqlite3.register_converter( 

52 "timestamp", lambda val: datetime.datetime.fromisoformat(val.decode()) 

53 ) 

54 

55 sqlite3_found = True 

56except ModuleNotFoundError: 

57 sqlite3_found = False 

58 

59 class DatabaseError(Exception): # type: ignore [no-redef] 

60 pass 

61 

62 class OperationalError(Exception): # type: ignore [no-redef] 

63 pass 

64 

65 

66InOrInOut = typing.Union[str, tuple[str, Optional[str]]] 

67 

68# ----------------------------------------------------------------------------- 

69# Classes and functions 

70# ----------------------------------------------------------------------------- 

71 

72 

73@undoc 

74class DummyDB: 

75 """Dummy DB that will act as a black hole for history. 

76 

77 Only used in the absence of sqlite""" 

78 

79 def execute(*args: typing.Any, **kwargs: typing.Any) -> list: 

80 return [] 

81 

82 def commit(self, *args, **kwargs): # type: ignore [no-untyped-def] 

83 pass 

84 

85 def __enter__(self, *args, **kwargs): # type: ignore [no-untyped-def] 

86 pass 

87 

88 def __exit__(self, *args, **kwargs): # type: ignore [no-untyped-def] 

89 pass 

90 

91 

92@decorator 

93def only_when_enabled(f, self, *a, **kw): # type: ignore [no-untyped-def] 

94 """Decorator: return an empty list in the absence of sqlite.""" 

95 if not self.enabled: 

96 return [] 

97 else: 

98 return f(self, *a, **kw) 

99 

100 

101# use 16kB as threshold for whether a corrupt history db should be saved 

102# that should be at least 100 entries or so 

103_SAVE_DB_SIZE = 16384 

104 

105 

106@decorator 

107def catch_corrupt_db(f, self, *a, **kw): # type: ignore [no-untyped-def] 

108 """A decorator which wraps HistoryAccessor method calls to catch errors from 

109 a corrupt SQLite database, move the old database out of the way, and create 

110 a new one. 

111 

112 We avoid clobbering larger databases because this may be triggered due to filesystem issues, 

113 not just a corrupt file. 

114 """ 

115 try: 

116 return f(self, *a, **kw) 

117 except (DatabaseError, OperationalError) as e: 

118 self._corrupt_db_counter += 1 

119 self.log.error("Failed to open SQLite history %s (%s).", self.hist_file, e) 

120 if self.hist_file != ":memory:": 

121 if self._corrupt_db_counter > self._corrupt_db_limit: 

122 self.hist_file = ":memory:" 

123 self.log.error( 

124 "Failed to load history too many times, history will not be saved." 

125 ) 

126 elif self.hist_file.is_file(): 

127 # move the file out of the way 

128 base = str(self.hist_file.parent / self.hist_file.stem) 

129 ext = self.hist_file.suffix 

130 size = self.hist_file.stat().st_size 

131 if size >= _SAVE_DB_SIZE: 

132 # if there's significant content, avoid clobbering 

133 now = ( 

134 datetime.datetime.now(datetime.timezone.utc) 

135 .isoformat() 

136 .replace(":", ".") 

137 ) 

138 newpath = base + "-corrupt-" + now + ext 

139 # don't clobber previous corrupt backups 

140 for i in range(100): 

141 if not Path(newpath).exists(): 

142 break 

143 else: 

144 newpath = base + "-corrupt-" + now + ("-%i" % i) + ext 

145 else: 

146 # not much content, possibly empty; don't worry about clobbering 

147 # maybe we should just delete it? 

148 newpath = base + "-corrupt" + ext 

149 self.hist_file.rename(newpath) 

150 self.log.error( 

151 "History file was moved to %s and a new file created.", newpath 

152 ) 

153 self.init_db() 

154 return [] 

155 else: 

156 # Failed with :memory:, something serious is wrong 

157 raise 

158 

159 

160class HistoryAccessorBase(LoggingConfigurable): 

161 """An abstract class for History Accessors""" 

162 

163 def get_tail( 

164 self, 

165 n: int = 10, 

166 raw: bool = True, 

167 output: bool = False, 

168 include_latest: bool = False, 

169 ) -> Iterable[tuple[int, int, InOrInOut]]: 

170 raise NotImplementedError 

171 

172 def search( 

173 self, 

174 pattern: str = "*", 

175 raw: bool = True, 

176 search_raw: bool = True, 

177 output: bool = False, 

178 n: Optional[int] = None, 

179 unique: bool = False, 

180 ) -> Iterable[tuple[int, int, InOrInOut]]: 

181 raise NotImplementedError 

182 

183 def get_range( 

184 self, 

185 session: int, 

186 start: int = 1, 

187 stop: Optional[int] = None, 

188 raw: bool = True, 

189 output: bool = False, 

190 ) -> Iterable[tuple[int, int, InOrInOut]]: 

191 raise NotImplementedError 

192 

193 def get_range_by_str( 

194 self, rangestr: str, raw: bool = True, output: bool = False 

195 ) -> Iterable[tuple[int, int, InOrInOut]]: 

196 raise NotImplementedError 

197 

198 

199class HistoryAccessor(HistoryAccessorBase): 

200 """Access the history database without adding to it. 

201 

202 This is intended for use by standalone history tools. IPython shells use 

203 HistoryManager, below, which is a subclass of this.""" 

204 

205 # counter for init_db retries, so we don't keep trying over and over 

206 _corrupt_db_counter = 0 

207 # after two failures, fallback on :memory: 

208 _corrupt_db_limit = 2 

209 

210 # String holding the path to the history file 

211 hist_file = Union( 

212 [Instance(Path), Unicode()], 

213 help="""Path to file to use for SQLite history database. 

214 

215 By default, IPython will put the history database in the IPython 

216 profile directory. If you would rather share one history among 

217 profiles, you can set this value in each, so that they are consistent. 

218 

219 Due to an issue with fcntl, SQLite is known to misbehave on some NFS 

220 mounts. If you see IPython hanging, try setting this to something on a 

221 local disk, e.g:: 

222 

223 ipython --HistoryManager.hist_file=/tmp/ipython_hist.sqlite 

224 

225 you can also use the specific value `:memory:` (including the colon 

226 at both end but not the back ticks), to avoid creating an history file. 

227 

228 """, 

229 ).tag(config=True) 

230 

231 enabled = Bool( 

232 sqlite3_found, 

233 help="""enable the SQLite history 

234 

235 set enabled=False to disable the SQLite history, 

236 in which case there will be no stored history, no SQLite connection, 

237 and no background saving thread. This may be necessary in some 

238 threaded environments where IPython is embedded. 

239 """, 

240 ).tag(config=True) 

241 

242 connection_options = Dict( 

243 help="""Options for configuring the SQLite connection 

244 

245 These options are passed as keyword args to sqlite3.connect 

246 when establishing database connections. 

247 """ 

248 ).tag(config=True) 

249 

250 @default("connection_options") 

251 def _default_connection_options(self) -> dict[str, bool]: 

252 return dict(check_same_thread=False) 

253 

254 # The SQLite database 

255 db = Any() 

256 

257 @observe("db") 

258 @only_when_enabled 

259 def _db_changed(self, change): # type: ignore [no-untyped-def] 

260 """validate the db, since it can be an Instance of two different types""" 

261 new = change["new"] 

262 connection_types = (DummyDB, sqlite3.Connection) 

263 if not isinstance(new, connection_types): 

264 msg = "%s.db must be sqlite3 Connection or DummyDB, not %r" % ( 

265 self.__class__.__name__, 

266 new, 

267 ) 

268 raise TraitError(msg) 

269 

270 def __init__( 

271 self, profile: str = "default", hist_file: str = "", **traits: typing.Any 

272 ) -> None: 

273 """Create a new history accessor. 

274 

275 Parameters 

276 ---------- 

277 profile : str 

278 The name of the profile from which to open history. 

279 hist_file : str 

280 Path to an SQLite history database stored by IPython. If specified, 

281 hist_file overrides profile. 

282 config : :class:`~traitlets.config.loader.Config` 

283 Config object. hist_file can also be set through this. 

284 """ 

285 super(HistoryAccessor, self).__init__(**traits) 

286 # defer setting hist_file from kwarg until after init, 

287 # otherwise the default kwarg value would clobber any value 

288 # set by config 

289 if hist_file: 

290 self.hist_file = hist_file 

291 

292 try: 

293 self.hist_file 

294 except TraitError: 

295 # No one has set the hist_file, yet. 

296 self.hist_file = self._get_hist_file_name(profile) 

297 

298 self.init_db() 

299 

300 def _get_hist_file_name(self, profile: str = "default") -> Path: 

301 """Find the history file for the given profile name. 

302 

303 This is overridden by the HistoryManager subclass, to use the shell's 

304 active profile. 

305 

306 Parameters 

307 ---------- 

308 profile : str 

309 The name of a profile which has a history file. 

310 """ 

311 return Path(locate_profile(profile)) / "history.sqlite" 

312 

313 @catch_corrupt_db 

314 def init_db(self) -> None: 

315 """Connect to the database, and create tables if necessary.""" 

316 if not self.enabled: 

317 self.db = DummyDB() 

318 return 

319 

320 # use detect_types so that timestamps return datetime objects 

321 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) 

322 kwargs.update(self.connection_options) 

323 self.db = sqlite3.connect(str(self.hist_file), **kwargs) # type: ignore [call-overload] 

324 with self.db: 

325 self.db.execute( 

326 """CREATE TABLE IF NOT EXISTS sessions (session integer 

327 primary key autoincrement, start timestamp, 

328 end timestamp, num_cmds integer, remark text)""" 

329 ) 

330 self.db.execute( 

331 """CREATE TABLE IF NOT EXISTS history 

332 (session integer, line integer, source text, source_raw text, 

333 PRIMARY KEY (session, line))""" 

334 ) 

335 # Output history is optional, but ensure the table's there so it can be 

336 # enabled later. 

337 self.db.execute( 

338 """CREATE TABLE IF NOT EXISTS output_history 

339 (session integer, line integer, output text, 

340 PRIMARY KEY (session, line))""" 

341 ) 

342 # success! reset corrupt db count 

343 self._corrupt_db_counter = 0 

344 

345 def writeout_cache(self) -> None: 

346 """Overridden by HistoryManager to dump the cache before certain 

347 database lookups.""" 

348 pass 

349 

350 ## ------------------------------- 

351 ## Methods for retrieving history: 

352 ## ------------------------------- 

353 def _run_sql( 

354 self, 

355 sql: str, 

356 params: tuple, 

357 raw: bool = True, 

358 output: bool = False, 

359 latest: bool = False, 

360 ) -> Iterable[tuple[int, int, InOrInOut]]: 

361 """Prepares and runs an SQL query for the history database. 

362 

363 Parameters 

364 ---------- 

365 sql : str 

366 Any filtering expressions to go after SELECT ... FROM ... 

367 params : tuple 

368 Parameters passed to the SQL query (to replace "?") 

369 raw, output : bool 

370 See :meth:`get_range` 

371 latest : bool 

372 Select rows with max (session, line) 

373 

374 Returns 

375 ------- 

376 Tuples as :meth:`get_range` 

377 """ 

378 toget = "source_raw" if raw else "source" 

379 sqlfrom = "history" 

380 if output: 

381 sqlfrom = "history LEFT JOIN output_history USING (session, line)" 

382 toget = "history.%s, output_history.output" % toget 

383 if latest: 

384 toget += ", MAX(session * 128 * 1024 + line)" 

385 this_querry = "SELECT session, line, %s FROM %s " % (toget, sqlfrom) + sql 

386 cur = self.db.execute(this_querry, params) 

387 if latest: 

388 cur = (row[:-1] for row in cur) 

389 if output: # Regroup into 3-tuples, and parse JSON 

390 return ((ses, lin, (inp, out)) for ses, lin, inp, out in cur) 

391 return cur 

392 

393 @only_when_enabled 

394 @catch_corrupt_db 

395 def get_session_info( 

396 self, session: int 

397 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]: 

398 """Get info about a session. 

399 

400 Parameters 

401 ---------- 

402 session : int 

403 Session number to retrieve. 

404 

405 Returns 

406 ------- 

407 session_id : int 

408 Session ID number 

409 start : datetime 

410 Timestamp for the start of the session. 

411 end : datetime 

412 Timestamp for the end of the session, or None if IPython crashed. 

413 num_cmds : int 

414 Number of commands run, or None if IPython crashed. 

415 remark : str 

416 A manually set description. 

417 """ 

418 query = "SELECT * from sessions where session == ?" 

419 return self.db.execute(query, (session,)).fetchone() 

420 

421 @catch_corrupt_db 

422 def get_last_session_id(self) -> Optional[int]: 

423 """Get the last session ID currently in the database. 

424 

425 Within IPython, this should be the same as the value stored in 

426 :attr:`HistoryManager.session_number`. 

427 """ 

428 for record in self.get_tail(n=1, include_latest=True): 

429 return record[0] 

430 return None 

431 

432 @catch_corrupt_db 

433 def get_tail( 

434 self, 

435 n: int = 10, 

436 raw: bool = True, 

437 output: bool = False, 

438 include_latest: bool = False, 

439 ) -> Iterable[tuple[int, int, InOrInOut]]: 

440 """Get the last n lines from the history database. 

441 

442 Parameters 

443 ---------- 

444 n : int 

445 The number of lines to get 

446 raw, output : bool 

447 See :meth:`get_range` 

448 include_latest : bool 

449 If False (default), n+1 lines are fetched, and the latest one 

450 is discarded. This is intended to be used where the function 

451 is called by a user command, which it should not return. 

452 

453 Returns 

454 ------- 

455 Tuples as :meth:`get_range` 

456 """ 

457 self.writeout_cache() 

458 if not include_latest: 

459 n += 1 

460 cur = self._run_sql( 

461 "ORDER BY session DESC, line DESC LIMIT ?", (n,), raw=raw, output=output 

462 ) 

463 if not include_latest: 

464 return reversed(list(cur)[1:]) 

465 return reversed(list(cur)) 

466 

467 @catch_corrupt_db 

468 def search( 

469 self, 

470 pattern: str = "*", 

471 raw: bool = True, 

472 search_raw: bool = True, 

473 output: bool = False, 

474 n: Optional[int] = None, 

475 unique: bool = False, 

476 ) -> Iterable[tuple[int, int, InOrInOut]]: 

477 """Search the database using unix glob-style matching (wildcards 

478 * and ?). 

479 

480 Parameters 

481 ---------- 

482 pattern : str 

483 The wildcarded pattern to match when searching 

484 search_raw : bool 

485 If True, search the raw input, otherwise, the parsed input 

486 raw, output : bool 

487 See :meth:`get_range` 

488 n : None or int 

489 If an integer is given, it defines the limit of 

490 returned entries. 

491 unique : bool 

492 When it is true, return only unique entries. 

493 

494 Returns 

495 ------- 

496 Tuples as :meth:`get_range` 

497 """ 

498 tosearch = "source_raw" if search_raw else "source" 

499 if output: 

500 tosearch = "history." + tosearch 

501 self.writeout_cache() 

502 sqlform = "WHERE %s GLOB ?" % tosearch 

503 params: tuple[typing.Any, ...] = (pattern,) 

504 if unique: 

505 sqlform += " GROUP BY {0}".format(tosearch) 

506 if n is not None: 

507 sqlform += " ORDER BY session DESC, line DESC LIMIT ?" 

508 params += (n,) 

509 elif unique: 

510 sqlform += " ORDER BY session, line" 

511 cur = self._run_sql(sqlform, params, raw=raw, output=output, latest=unique) 

512 if n is not None: 

513 return reversed(list(cur)) 

514 return cur 

515 

516 @catch_corrupt_db 

517 def get_range( 

518 self, 

519 session: int, 

520 start: int = 1, 

521 stop: Optional[int] = None, 

522 raw: bool = True, 

523 output: bool = False, 

524 ) -> Iterable[tuple[int, int, InOrInOut]]: 

525 """Retrieve input by session. 

526 

527 Parameters 

528 ---------- 

529 session : int 

530 Session number to retrieve. 

531 start : int 

532 First line to retrieve. 

533 stop : int 

534 End of line range (excluded from output itself). If None, retrieve 

535 to the end of the session. 

536 raw : bool 

537 If True, return untranslated input 

538 output : bool 

539 If True, attempt to include output. This will be 'real' Python 

540 objects for the current session, or text reprs from previous 

541 sessions if db_log_output was enabled at the time. Where no output 

542 is found, None is used. 

543 

544 Returns 

545 ------- 

546 entries 

547 An iterator over the desired lines. Each line is a 3-tuple, either 

548 (session, line, input) if output is False, or 

549 (session, line, (input, output)) if output is True. 

550 """ 

551 params: tuple[typing.Any, ...] 

552 if stop: 

553 lineclause = "line >= ? AND line < ?" 

554 params = (session, start, stop) 

555 else: 

556 lineclause = "line>=?" 

557 params = (session, start) 

558 

559 return self._run_sql( 

560 "WHERE session==? AND %s" % lineclause, params, raw=raw, output=output 

561 ) 

562 

563 def get_range_by_str( 

564 self, rangestr: str, raw: bool = True, output: bool = False 

565 ) -> Iterable[tuple[int, int, InOrInOut]]: 

566 """Get lines of history from a string of ranges, as used by magic 

567 commands %hist, %save, %macro, etc. 

568 

569 Parameters 

570 ---------- 

571 rangestr : str 

572 A string specifying ranges, e.g. "5 ~2/1-4". If empty string is used, 

573 this will return everything from current session's history. 

574 

575 See the documentation of :func:`%history` for the full details. 

576 

577 raw, output : bool 

578 As :meth:`get_range` 

579 

580 Returns 

581 ------- 

582 Tuples as :meth:`get_range` 

583 """ 

584 for sess, s, e in extract_hist_ranges(rangestr): 

585 yield from self.get_range(sess, s, e, raw=raw, output=output) 

586 

587 

588@dataclass 

589class HistoryOutput: 

590 output_type: typing.Literal[ 

591 "out_stream", "err_stream", "display_data", "execute_result" 

592 ] 

593 bundle: typing.Dict[str, str] 

594 

595 

596class HistoryManager(HistoryAccessor): 

597 """A class to organize all history-related functionality in one place.""" 

598 

599 # Public interface 

600 

601 # An instance of the IPython shell we are attached to 

602 shell = Instance( 

603 "IPython.core.interactiveshell.InteractiveShellABC", allow_none=False 

604 ) 

605 # Lists to hold processed and raw history. These start with a blank entry 

606 # so that we can index them starting from 1 

607 input_hist_parsed = List([""]) 

608 input_hist_raw = List([""]) 

609 # A list of directories visited during session 

610 dir_hist: List = List() 

611 

612 @default("dir_hist") 

613 def _dir_hist_default(self) -> list[Path]: 

614 try: 

615 return [Path.cwd()] 

616 except OSError: 

617 return [] 

618 

619 # A dict of output history, keyed with ints from the shell's 

620 # execution count. 

621 output_hist = Dict() 

622 # The text/plain repr of outputs. 

623 output_hist_reprs: typing.Dict[int, str] = Dict() # type: ignore [assignment] 

624 # Maps execution_count to MIME bundles 

625 outputs: typing.Dict[int, typing.List[HistoryOutput]] = defaultdict(list) 

626 # Maps execution_count to exception tracebacks 

627 exceptions: typing.Dict[int, typing.Dict[str, Any]] = Dict() # type: ignore [assignment] 

628 

629 # The number of the current session in the history database 

630 session_number: int = Integer() # type: ignore [assignment] 

631 

632 db_log_output = Bool( 

633 False, help="Should the history database include output? (default: no)" 

634 ).tag(config=True) 

635 db_cache_size = Integer( 

636 0, 

637 help="Write to database every x commands (higher values save disk access & power).\n" 

638 "Values of 1 or less effectively disable caching.", 

639 ).tag(config=True) 

640 # The input and output caches 

641 db_input_cache: List[tuple[int, str, str]] = List() 

642 db_output_cache: List[tuple[int, str]] = List() 

643 

644 # History saving in separate thread 

645 save_thread = Instance("IPython.core.history.HistorySavingThread", allow_none=True) 

646 

647 @property 

648 def save_flag(self) -> threading.Event | None: 

649 if self.save_thread is not None: 

650 return self.save_thread.save_flag 

651 return None 

652 

653 # Private interface 

654 # Variables used to store the three last inputs from the user. On each new 

655 # history update, we populate the user's namespace with these, shifted as 

656 # necessary. 

657 _i00 = Unicode("") 

658 _i = Unicode("") 

659 _ii = Unicode("") 

660 _iii = Unicode("") 

661 

662 # A regex matching all forms of the exit command, so that we don't store 

663 # them in the history (it's annoying to rewind the first entry and land on 

664 # an exit call). 

665 _exit_re = re.compile(r"(exit|quit)(\s*\(.*\))?$") 

666 

667 _instances: WeakSet[HistoryManager] = WeakSet() 

668 _max_inst: int | float = float("inf") 

669 

670 def __init__( 

671 self, 

672 shell: InteractiveShell, 

673 config: Optional[Configuration] = None, 

674 **traits: typing.Any, 

675 ): 

676 """Create a new history manager associated with a shell instance.""" 

677 super().__init__(shell=shell, config=config, **traits) 

678 self.db_input_cache_lock = threading.Lock() 

679 self.db_output_cache_lock = threading.Lock() 

680 

681 try: 

682 self.new_session() 

683 except OperationalError: 

684 self.log.error( 

685 "Failed to create history session in %s. History will not be saved.", 

686 self.hist_file, 

687 exc_info=True, 

688 ) 

689 self.hist_file = ":memory:" 

690 

691 if self.enabled and self.hist_file != ":memory:": 

692 self.save_thread = HistorySavingThread(self) 

693 try: 

694 self.save_thread.start() 

695 except RuntimeError: 

696 self.log.error( 

697 "Failed to start history saving thread. History will not be saved.", 

698 exc_info=True, 

699 ) 

700 self.hist_file = ":memory:" 

701 self._instances.add(self) 

702 assert len(HistoryManager._instances) <= HistoryManager._max_inst, ( 

703 len(HistoryManager._instances), 

704 HistoryManager._max_inst, 

705 ) 

706 

707 def __del__(self) -> None: 

708 if self.save_thread is not None: 

709 self.save_thread.stop() 

710 

711 def _get_hist_file_name(self, profile: Optional[str] = None) -> Path: 

712 """Get default history file name based on the Shell's profile. 

713 

714 The profile parameter is ignored, but must exist for compatibility with 

715 the parent class.""" 

716 profile_dir = self.shell.profile_dir.location 

717 return Path(profile_dir) / "history.sqlite" 

718 

719 @only_when_enabled 

720 def new_session(self, conn: Optional[sqlite3.Connection] = None) -> None: 

721 """Get a new session number.""" 

722 if conn is None: 

723 conn = self.db 

724 

725 with conn: 

726 cur = conn.execute( 

727 """INSERT INTO sessions VALUES (NULL, ?, NULL, 

728 NULL, '') """, 

729 (datetime.datetime.now().isoformat(" "),), 

730 ) 

731 assert isinstance(cur.lastrowid, int) 

732 self.session_number = cur.lastrowid 

733 

734 def end_session(self) -> None: 

735 """Close the database session, filling in the end time and line count.""" 

736 self.writeout_cache() 

737 with self.db: 

738 self.db.execute( 

739 """UPDATE sessions SET end=?, num_cmds=? WHERE 

740 session==?""", 

741 ( 

742 datetime.datetime.now(datetime.timezone.utc).isoformat(" "), 

743 len(self.input_hist_parsed) - 1, 

744 self.session_number, 

745 ), 

746 ) 

747 self.session_number = 0 

748 

749 def name_session(self, name: str) -> None: 

750 """Give the current session a name in the history database.""" 

751 warn( 

752 "name_session is deprecated in IPython 9.0 and will be removed in future versions", 

753 DeprecationWarning, 

754 stacklevel=2, 

755 ) 

756 with self.db: 

757 self.db.execute( 

758 "UPDATE sessions SET remark=? WHERE session==?", 

759 (name, self.session_number), 

760 ) 

761 

762 def reset(self, new_session: bool = True) -> None: 

763 """Clear the session history, releasing all object references, and 

764 optionally open a new session.""" 

765 self.output_hist.clear() 

766 self.outputs.clear() 

767 self.exceptions.clear() 

768 

769 # The directory history can't be completely empty 

770 self.dir_hist[:] = [Path.cwd()] 

771 

772 if new_session: 

773 if self.session_number: 

774 self.end_session() 

775 self.input_hist_parsed[:] = [""] 

776 self.input_hist_raw[:] = [""] 

777 self.new_session() 

778 

779 # ------------------------------ 

780 # Methods for retrieving history 

781 # ------------------------------ 

782 def get_session_info( 

783 self, session: int = 0 

784 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]: 

785 """Get info about a session. 

786 

787 Parameters 

788 ---------- 

789 session : int 

790 Session number to retrieve. The current session is 0, and negative 

791 numbers count back from current session, so -1 is the previous session. 

792 

793 Returns 

794 ------- 

795 session_id : int 

796 Session ID number 

797 start : datetime 

798 Timestamp for the start of the session. 

799 end : datetime 

800 Timestamp for the end of the session, or None if IPython crashed. 

801 num_cmds : int 

802 Number of commands run, or None if IPython crashed. 

803 remark : str 

804 A manually set description. 

805 """ 

806 if session <= 0: 

807 session += self.session_number 

808 

809 return super(HistoryManager, self).get_session_info(session=session) 

810 

811 @catch_corrupt_db 

812 def get_tail( 

813 self, 

814 n: int = 10, 

815 raw: bool = True, 

816 output: bool = False, 

817 include_latest: bool = False, 

818 ) -> Iterable[tuple[int, int, InOrInOut]]: 

819 """Get the last n lines from the history database. 

820 

821 Most recent entry last. 

822 

823 Completion will be reordered so that that the last ones are when 

824 possible from current session. 

825 

826 Parameters 

827 ---------- 

828 n : int 

829 The number of lines to get 

830 raw, output : bool 

831 See :meth:`get_range` 

832 include_latest : bool 

833 If False (default), n+1 lines are fetched, and the latest one 

834 is discarded. This is intended to be used where the function 

835 is called by a user command, which it should not return. 

836 

837 Returns 

838 ------- 

839 Tuples as :meth:`get_range` 

840 """ 

841 self.writeout_cache() 

842 if not include_latest: 

843 n += 1 

844 # cursor/line/entry 

845 this_cur = list( 

846 self._run_sql( 

847 "WHERE session == ? ORDER BY line DESC LIMIT ? ", 

848 (self.session_number, n), 

849 raw=raw, 

850 output=output, 

851 ) 

852 ) 

853 other_cur = list( 

854 self._run_sql( 

855 "WHERE session != ? ORDER BY session DESC, line DESC LIMIT ?", 

856 (self.session_number, n), 

857 raw=raw, 

858 output=output, 

859 ) 

860 ) 

861 

862 everything: list[tuple[int, int, InOrInOut]] = this_cur + other_cur 

863 

864 everything = everything[:n] 

865 

866 if not include_latest: 

867 return list(everything)[:0:-1] 

868 return list(everything)[::-1] 

869 

870 def _get_range_session( 

871 self, 

872 start: int = 1, 

873 stop: Optional[int] = None, 

874 raw: bool = True, 

875 output: bool = False, 

876 ) -> Iterable[tuple[int, int, InOrInOut]]: 

877 """Get input and output history from the current session. Called by 

878 get_range, and takes similar parameters.""" 

879 input_hist = self.input_hist_raw if raw else self.input_hist_parsed 

880 

881 n = len(input_hist) 

882 if start < 0: 

883 start += n 

884 if not stop or (stop > n): 

885 stop = n 

886 elif stop < 0: 

887 stop += n 

888 line: InOrInOut 

889 for i in range(start, stop): 

890 if output: 

891 line = (input_hist[i], self.output_hist_reprs.get(i)) 

892 else: 

893 line = input_hist[i] 

894 yield (0, i, line) 

895 

896 def get_range( 

897 self, 

898 session: int = 0, 

899 start: int = 1, 

900 stop: Optional[int] = None, 

901 raw: bool = True, 

902 output: bool = False, 

903 ) -> Iterable[tuple[int, int, InOrInOut]]: 

904 """Retrieve input by session. 

905 

906 Parameters 

907 ---------- 

908 session : int 

909 Session number to retrieve. The current session is 0, and negative 

910 numbers count back from current session, so -1 is previous session. 

911 start : int 

912 First line to retrieve. 

913 stop : int 

914 End of line range (excluded from output itself). If None, retrieve 

915 to the end of the session. 

916 raw : bool 

917 If True, return untranslated input 

918 output : bool 

919 If True, attempt to include output. This will be 'real' Python 

920 objects for the current session, or text reprs from previous 

921 sessions if db_log_output was enabled at the time. Where no output 

922 is found, None is used. 

923 

924 Returns 

925 ------- 

926 entries 

927 An iterator over the desired lines. Each line is a 3-tuple, either 

928 (session, line, input) if output is False, or 

929 (session, line, (input, output)) if output is True. 

930 """ 

931 if session <= 0: 

932 session += self.session_number 

933 if session == self.session_number: # Current session 

934 return self._get_range_session(start, stop, raw, output) 

935 return super(HistoryManager, self).get_range(session, start, stop, raw, output) 

936 

937 ## ---------------------------- 

938 ## Methods for storing history: 

939 ## ---------------------------- 

940 def store_inputs( 

941 self, line_num: int, source: str, source_raw: Optional[str] = None 

942 ) -> None: 

943 """Store source and raw input in history and create input cache 

944 variables ``_i*``. 

945 

946 Parameters 

947 ---------- 

948 line_num : int 

949 The prompt number of this input. 

950 source : str 

951 Python input. 

952 source_raw : str, optional 

953 If given, this is the raw input without any IPython transformations 

954 applied to it. If not given, ``source`` is used. 

955 """ 

956 if source_raw is None: 

957 source_raw = source 

958 source = source.rstrip("\n") 

959 source_raw = source_raw.rstrip("\n") 

960 

961 # do not store exit/quit commands 

962 if self._exit_re.match(source_raw.strip()): 

963 return 

964 

965 self.input_hist_parsed.append(source) 

966 self.input_hist_raw.append(source_raw) 

967 

968 with self.db_input_cache_lock: 

969 self.db_input_cache.append((line_num, source, source_raw)) 

970 # Trigger to flush cache and write to DB. 

971 if len(self.db_input_cache) >= self.db_cache_size: 

972 if self.save_flag: 

973 self.save_flag.set() 

974 

975 # update the auto _i variables 

976 self._iii = self._ii 

977 self._ii = self._i 

978 self._i = self._i00 

979 self._i00 = source_raw 

980 

981 # hackish access to user namespace to create _i1,_i2... dynamically 

982 new_i = "_i%s" % line_num 

983 to_main = {"_i": self._i, "_ii": self._ii, "_iii": self._iii, new_i: self._i00} 

984 

985 if self.shell is not None: 

986 self.shell.push(to_main, interactive=False) 

987 

988 def store_output(self, line_num: int) -> None: 

989 """If database output logging is enabled, this saves all the 

990 outputs from the indicated prompt number to the database. It's 

991 called by run_cell after code has been executed. 

992 

993 Parameters 

994 ---------- 

995 line_num : int 

996 The line number from which to save outputs 

997 """ 

998 if (not self.db_log_output) or (line_num not in self.output_hist_reprs): 

999 return 

1000 lnum: int = line_num 

1001 output = self.output_hist_reprs[line_num] 

1002 

1003 with self.db_output_cache_lock: 

1004 self.db_output_cache.append((line_num, output)) 

1005 if self.db_cache_size <= 1 and self.save_flag is not None: 

1006 self.save_flag.set() 

1007 

1008 def _writeout_input_cache(self, conn: sqlite3.Connection) -> None: 

1009 with conn: 

1010 for line in self.db_input_cache: 

1011 conn.execute( 

1012 "INSERT INTO history VALUES (?, ?, ?, ?)", 

1013 (self.session_number,) + line, 

1014 ) 

1015 

1016 def _writeout_output_cache(self, conn: sqlite3.Connection) -> None: 

1017 with conn: 

1018 for line in self.db_output_cache: 

1019 conn.execute( 

1020 "INSERT INTO output_history VALUES (?, ?, ?)", 

1021 (self.session_number,) + line, 

1022 ) 

1023 

1024 @only_when_enabled 

1025 def writeout_cache(self, conn: Optional[sqlite3.Connection] = None) -> None: 

1026 """Write any entries in the cache to the database.""" 

1027 if conn is None: 

1028 conn = self.db 

1029 

1030 with self.db_input_cache_lock: 

1031 try: 

1032 self._writeout_input_cache(conn) 

1033 except sqlite3.IntegrityError: 

1034 self.new_session(conn) 

1035 print( 

1036 "ERROR! Session/line number was not unique in", 

1037 "database. History logging moved to new session", 

1038 self.session_number, 

1039 ) 

1040 try: 

1041 # Try writing to the new session. If this fails, don't 

1042 # recurse 

1043 self._writeout_input_cache(conn) 

1044 except sqlite3.IntegrityError: 

1045 pass 

1046 finally: 

1047 self.db_input_cache = [] 

1048 

1049 with self.db_output_cache_lock: 

1050 try: 

1051 self._writeout_output_cache(conn) 

1052 except sqlite3.IntegrityError: 

1053 print( 

1054 "!! Session/line number for output was not unique", 

1055 "in database. Output will not be stored.", 

1056 ) 

1057 finally: 

1058 self.db_output_cache = [] 

1059 

1060 

1061from typing import Callable, Iterator 

1062from weakref import ReferenceType 

1063 

1064 

1065@contextmanager 

1066def hold(ref: ReferenceType[HistoryManager]) -> Iterator[ReferenceType[HistoryManager]]: 

1067 """ 

1068 Context manger that hold a reference to a weak ref to make sure it 

1069 is not GC'd during it's context. 

1070 """ 

1071 r = ref() 

1072 yield ref 

1073 del r 

1074 

1075 

1076class HistorySavingThread(threading.Thread): 

1077 """This thread takes care of writing history to the database, so that 

1078 the UI isn't held up while that happens. 

1079 

1080 It waits for the HistoryManager's save_flag to be set, then writes out 

1081 the history cache. The main thread is responsible for setting the flag when 

1082 the cache size reaches a defined threshold.""" 

1083 

1084 save_flag: threading.Event 

1085 daemon: bool = True 

1086 _stop_now: bool = False 

1087 enabled: bool = True 

1088 history_manager: ref[HistoryManager] 

1089 _stopped = False 

1090 

1091 def __init__(self, history_manager: HistoryManager) -> None: 

1092 super(HistorySavingThread, self).__init__(name="IPythonHistorySavingThread") 

1093 self.history_manager = ref(history_manager) 

1094 self.enabled = history_manager.enabled 

1095 self.save_flag = threading.Event() 

1096 

1097 @only_when_enabled 

1098 def run(self) -> None: 

1099 atexit.register(self.stop) 

1100 # We need a separate db connection per thread: 

1101 try: 

1102 hm: ReferenceType[HistoryManager] 

1103 with hold(self.history_manager) as hm: 

1104 if hm() is not None: 

1105 self.db = sqlite3.connect( 

1106 str(hm().hist_file), # type: ignore [union-attr] 

1107 **hm().connection_options, # type: ignore [union-attr] 

1108 ) 

1109 while True: 

1110 self.save_flag.wait() 

1111 with hold(self.history_manager) as hm: 

1112 if hm() is None: 

1113 self._stop_now = True 

1114 if self._stop_now: 

1115 self.db.close() 

1116 return 

1117 self.save_flag.clear() 

1118 if hm() is not None: 

1119 hm().writeout_cache(self.db) # type: ignore [union-attr] 

1120 

1121 except Exception as e: 

1122 print( 

1123 ( 

1124 "The history saving thread hit an unexpected error (%s)." 

1125 "History will not be written to the database." 

1126 ) 

1127 % repr(e) 

1128 ) 

1129 finally: 

1130 atexit.unregister(self.stop) 

1131 

1132 def stop(self) -> None: 

1133 """This can be called from the main thread to safely stop this thread. 

1134 

1135 Note that it does not attempt to write out remaining history before 

1136 exiting. That should be done by calling the HistoryManager's 

1137 end_session method.""" 

1138 if self._stopped: 

1139 return 

1140 self._stop_now = True 

1141 

1142 self.save_flag.set() 

1143 self._stopped = True 

1144 if self != threading.current_thread(): 

1145 self.join() 

1146 

1147 def __del__(self) -> None: 

1148 self.stop() 

1149 

1150 

1151# To match, e.g. ~5/8-~2/3 

1152range_re = re.compile( 

1153 r""" 

1154((?P<startsess>~?\d+)/)? 

1155(?P<start>\d+)? 

1156((?P<sep>[\-:]) 

1157 ((?P<endsess>~?\d+)/)? 

1158 (?P<end>\d+))? 

1159$""", 

1160 re.VERBOSE, 

1161) 

1162 

1163 

1164def extract_hist_ranges(ranges_str: str) -> Iterable[tuple[int, int, Optional[int]]]: 

1165 """Turn a string of history ranges into 3-tuples of (session, start, stop). 

1166 

1167 Empty string results in a `[(0, 1, None)]`, i.e. "everything from current 

1168 session". 

1169 

1170 Examples 

1171 -------- 

1172 >>> list(extract_hist_ranges("~8/5-~7/4 2")) 

1173 [(-8, 5, None), (-7, 1, 5), (0, 2, 3)] 

1174 """ 

1175 if ranges_str == "": 

1176 yield (0, 1, None) # Everything from current session 

1177 return 

1178 

1179 for range_str in ranges_str.split(): 

1180 rmatch = range_re.match(range_str) 

1181 if not rmatch: 

1182 continue 

1183 start = rmatch.group("start") 

1184 if start: 

1185 start = int(start) 

1186 end = rmatch.group("end") 

1187 # If no end specified, get (a, a + 1) 

1188 end = int(end) if end else start + 1 

1189 else: # start not specified 

1190 if not rmatch.group("startsess"): # no startsess 

1191 continue 

1192 start = 1 

1193 end = None # provide the entire session hist 

1194 

1195 if rmatch.group("sep") == "-": # 1-3 == 1:4 --> [1, 2, 3] 

1196 assert end is not None 

1197 end += 1 

1198 startsess = rmatch.group("startsess") or "0" 

1199 endsess = rmatch.group("endsess") or startsess 

1200 startsess = int(startsess.replace("~", "-")) 

1201 endsess = int(endsess.replace("~", "-")) 

1202 assert endsess >= startsess, "start session must be earlier than end session" 

1203 

1204 if endsess == startsess: 

1205 yield (startsess, start, end) 

1206 continue 

1207 # Multiple sessions in one range: 

1208 yield (startsess, start, None) 

1209 for sess in range(startsess + 1, endsess): 

1210 yield (sess, 1, None) 

1211 yield (endsess, 1, end) 

1212 

1213 

1214def _format_lineno(session: int, line: int) -> str: 

1215 """Helper function to format line numbers properly.""" 

1216 if session == 0: 

1217 return str(line) 

1218 return "%s#%s" % (session, line)