Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/history.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

470 statements  

1"""History related magics and functionality""" 

2 

3from __future__ import annotations 

4 

5# Copyright (c) IPython Development Team. 

6# Distributed under the terms of the Modified BSD License. 

7 

8 

9import atexit 

10import datetime 

11import re 

12 

13 

14import threading 

15from pathlib import Path 

16 

17from collections import defaultdict 

18from contextlib import contextmanager 

19from dataclasses import dataclass 

20from decorator import decorator 

21from traitlets import ( 

22 Any, 

23 Bool, 

24 Dict, 

25 Instance, 

26 Integer, 

27 List, 

28 TraitError, 

29 Unicode, 

30 Union, 

31 default, 

32 observe, 

33) 

34from traitlets.config.configurable import LoggingConfigurable 

35 

36from IPython.paths import locate_profile 

37from IPython.utils.decorators import undoc 

38from typing import Tuple, Optional, TYPE_CHECKING 

39from collections.abc import Iterable 

40import typing 

41from warnings import warn 

42from weakref import ref, WeakSet 

43 

44if TYPE_CHECKING: 

45 from IPython.core.interactiveshell import InteractiveShell 

46 from IPython.config.Configuration import Configuration 

47 

48try: 

49 from sqlite3 import DatabaseError, OperationalError 

50 import sqlite3 

51 

52 sqlite3.register_converter( 

53 "timestamp", lambda val: datetime.datetime.fromisoformat(val.decode()) 

54 ) 

55 

56 sqlite3_found = True 

57except ModuleNotFoundError: 

58 sqlite3_found = False 

59 

60 class DatabaseError(Exception): # type: ignore [no-redef] 

61 pass 

62 

63 class OperationalError(Exception): # type: ignore [no-redef] 

64 pass 

65 

66 

67InOrInOut = typing.Union[str, tuple[str, Optional[str]]] 

68 

69# ----------------------------------------------------------------------------- 

70# Classes and functions 

71# ----------------------------------------------------------------------------- 

72 

73 

74@undoc 

75class DummyDB: 

76 """Dummy DB that will act as a black hole for history. 

77 

78 Only used in the absence of sqlite""" 

79 

80 def execute(*args: typing.Any, **kwargs: typing.Any) -> list: 

81 return [] 

82 

83 def commit(self, *args, **kwargs): # type: ignore [no-untyped-def] 

84 pass 

85 

86 def __enter__(self, *args, **kwargs): # type: ignore [no-untyped-def] 

87 pass 

88 

89 def __exit__(self, *args, **kwargs): # type: ignore [no-untyped-def] 

90 pass 

91 

92 

93@decorator 

94def only_when_enabled(f, self, *a, **kw): # type: ignore [no-untyped-def] 

95 """Decorator: return an empty list in the absence of sqlite.""" 

96 if not self.enabled: 

97 return [] 

98 else: 

99 return f(self, *a, **kw) 

100 

101 

102# use 16kB as threshold for whether a corrupt history db should be saved 

103# that should be at least 100 entries or so 

104_SAVE_DB_SIZE = 16384 

105 

106 

107@decorator 

108def catch_corrupt_db(f, self, *a, **kw): # type: ignore [no-untyped-def] 

109 """A decorator which wraps HistoryAccessor method calls to catch errors from 

110 a corrupt SQLite database, move the old database out of the way, and create 

111 a new one. 

112 

113 We avoid clobbering larger databases because this may be triggered due to filesystem issues, 

114 not just a corrupt file. 

115 """ 

116 try: 

117 return f(self, *a, **kw) 

118 except (DatabaseError, OperationalError) as e: 

119 self._corrupt_db_counter += 1 

120 self.log.error("Failed to open SQLite history %s (%s).", self.hist_file, e) 

121 if self.hist_file != ":memory:": 

122 if self._corrupt_db_counter > self._corrupt_db_limit: 

123 self.hist_file = ":memory:" 

124 self.log.error( 

125 "Failed to load history too many times, history will not be saved." 

126 ) 

127 elif self.hist_file.is_file(): 

128 # move the file out of the way 

129 base = str(self.hist_file.parent / self.hist_file.stem) 

130 ext = self.hist_file.suffix 

131 size = self.hist_file.stat().st_size 

132 if size >= _SAVE_DB_SIZE: 

133 # if there's significant content, avoid clobbering 

134 now = ( 

135 datetime.datetime.now(datetime.timezone.utc) 

136 .isoformat() 

137 .replace(":", ".") 

138 ) 

139 newpath = base + "-corrupt-" + now + ext 

140 # don't clobber previous corrupt backups 

141 for i in range(100): 

142 if not Path(newpath).exists(): 

143 break 

144 else: 

145 newpath = base + "-corrupt-" + now + ("-%i" % i) + ext 

146 else: 

147 # not much content, possibly empty; don't worry about clobbering 

148 # maybe we should just delete it? 

149 newpath = base + "-corrupt" + ext 

150 self.hist_file.rename(newpath) 

151 self.log.error( 

152 "History file was moved to %s and a new file created.", newpath 

153 ) 

154 self.init_db() 

155 return [] 

156 else: 

157 # Failed with :memory:, something serious is wrong 

158 raise 

159 

160 

161class HistoryAccessorBase(LoggingConfigurable): 

162 """An abstract class for History Accessors""" 

163 

164 def get_tail( 

165 self, 

166 n: int = 10, 

167 raw: bool = True, 

168 output: bool = False, 

169 include_latest: bool = False, 

170 ) -> Iterable[tuple[int, int, InOrInOut]]: 

171 raise NotImplementedError 

172 

173 def search( 

174 self, 

175 pattern: str = "*", 

176 raw: bool = True, 

177 search_raw: bool = True, 

178 output: bool = False, 

179 n: Optional[int] = None, 

180 unique: bool = False, 

181 ) -> Iterable[tuple[int, int, InOrInOut]]: 

182 raise NotImplementedError 

183 

184 def get_range( 

185 self, 

186 session: int, 

187 start: int = 1, 

188 stop: Optional[int] = None, 

189 raw: bool = True, 

190 output: bool = False, 

191 ) -> Iterable[tuple[int, int, InOrInOut]]: 

192 raise NotImplementedError 

193 

194 def get_range_by_str( 

195 self, rangestr: str, raw: bool = True, output: bool = False 

196 ) -> Iterable[tuple[int, int, InOrInOut]]: 

197 raise NotImplementedError 

198 

199 

200class HistoryAccessor(HistoryAccessorBase): 

201 """Access the history database without adding to it. 

202 

203 This is intended for use by standalone history tools. IPython shells use 

204 HistoryManager, below, which is a subclass of this.""" 

205 

206 # counter for init_db retries, so we don't keep trying over and over 

207 _corrupt_db_counter = 0 

208 # after two failures, fallback on :memory: 

209 _corrupt_db_limit = 2 

210 

211 # String holding the path to the history file 

212 hist_file = Union( 

213 [Instance(Path), Unicode()], 

214 help="""Path to file to use for SQLite history database. 

215 

216 By default, IPython will put the history database in the IPython 

217 profile directory. If you would rather share one history among 

218 profiles, you can set this value in each, so that they are consistent. 

219 

220 Due to an issue with fcntl, SQLite is known to misbehave on some NFS 

221 mounts. If you see IPython hanging, try setting this to something on a 

222 local disk, e.g:: 

223 

224 ipython --HistoryManager.hist_file=/tmp/ipython_hist.sqlite 

225 

226 you can also use the specific value `:memory:` (including the colon 

227 at both end but not the back ticks), to avoid creating an history file. 

228 

229 """, 

230 ).tag(config=True) 

231 

232 enabled = Bool( 

233 sqlite3_found, 

234 help="""enable the SQLite history 

235 

236 set enabled=False to disable the SQLite history, 

237 in which case there will be no stored history, no SQLite connection, 

238 and no background saving thread. This may be necessary in some 

239 threaded environments where IPython is embedded. 

240 """, 

241 ).tag(config=True) 

242 

243 connection_options = Dict( 

244 help="""Options for configuring the SQLite connection 

245 

246 These options are passed as keyword args to sqlite3.connect 

247 when establishing database connections. 

248 """ 

249 ).tag(config=True) 

250 

251 @default("connection_options") 

252 def _default_connection_options(self) -> dict[str, bool]: 

253 return dict(check_same_thread=False) 

254 

255 # The SQLite database 

256 db = Any() 

257 

258 @observe("db") 

259 @only_when_enabled 

260 def _db_changed(self, change): # type: ignore [no-untyped-def] 

261 """validate the db, since it can be an Instance of two different types""" 

262 new = change["new"] 

263 connection_types = (DummyDB, sqlite3.Connection) 

264 if not isinstance(new, connection_types): 

265 msg = "%s.db must be sqlite3 Connection or DummyDB, not %r" % ( 

266 self.__class__.__name__, 

267 new, 

268 ) 

269 raise TraitError(msg) 

270 

271 def __init__( 

272 self, profile: str = "default", hist_file: str = "", **traits: typing.Any 

273 ) -> None: 

274 """Create a new history accessor. 

275 

276 Parameters 

277 ---------- 

278 profile : str 

279 The name of the profile from which to open history. 

280 hist_file : str 

281 Path to an SQLite history database stored by IPython. If specified, 

282 hist_file overrides profile. 

283 config : :class:`~traitlets.config.loader.Config` 

284 Config object. hist_file can also be set through this. 

285 """ 

286 super(HistoryAccessor, self).__init__(**traits) 

287 # defer setting hist_file from kwarg until after init, 

288 # otherwise the default kwarg value would clobber any value 

289 # set by config 

290 if hist_file: 

291 self.hist_file = hist_file 

292 

293 try: 

294 self.hist_file 

295 except TraitError: 

296 # No one has set the hist_file, yet. 

297 self.hist_file = self._get_hist_file_name(profile) 

298 

299 self.init_db() 

300 

301 def _get_hist_file_name(self, profile: str = "default") -> Path: 

302 """Find the history file for the given profile name. 

303 

304 This is overridden by the HistoryManager subclass, to use the shell's 

305 active profile. 

306 

307 Parameters 

308 ---------- 

309 profile : str 

310 The name of a profile which has a history file. 

311 """ 

312 return Path(locate_profile(profile)) / "history.sqlite" 

313 

314 @catch_corrupt_db 

315 def init_db(self) -> None: 

316 """Connect to the database, and create tables if necessary.""" 

317 if not self.enabled: 

318 self.db = DummyDB() 

319 return 

320 

321 # use detect_types so that timestamps return datetime objects 

322 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) 

323 kwargs.update(self.connection_options) 

324 self.db = sqlite3.connect(str(self.hist_file), **kwargs) # type: ignore [call-overload] 

325 with self.db: 

326 self.db.execute( 

327 """CREATE TABLE IF NOT EXISTS sessions (session integer 

328 primary key autoincrement, start timestamp, 

329 end timestamp, num_cmds integer, remark text)""" 

330 ) 

331 self.db.execute( 

332 """CREATE TABLE IF NOT EXISTS history 

333 (session integer, line integer, source text, source_raw text, 

334 PRIMARY KEY (session, line))""" 

335 ) 

336 # Output history is optional, but ensure the table's there so it can be 

337 # enabled later. 

338 self.db.execute( 

339 """CREATE TABLE IF NOT EXISTS output_history 

340 (session integer, line integer, output text, 

341 PRIMARY KEY (session, line))""" 

342 ) 

343 # success! reset corrupt db count 

344 self._corrupt_db_counter = 0 

345 

346 def writeout_cache(self) -> None: 

347 """Overridden by HistoryManager to dump the cache before certain 

348 database lookups.""" 

349 pass 

350 

351 ## ------------------------------- 

352 ## Methods for retrieving history: 

353 ## ------------------------------- 

354 def _run_sql( 

355 self, 

356 sql: str, 

357 params: tuple, 

358 raw: bool = True, 

359 output: bool = False, 

360 latest: bool = False, 

361 ) -> Iterable[tuple[int, int, InOrInOut]]: 

362 """Prepares and runs an SQL query for the history database. 

363 

364 Parameters 

365 ---------- 

366 sql : str 

367 Any filtering expressions to go after SELECT ... FROM ... 

368 params : tuple 

369 Parameters passed to the SQL query (to replace "?") 

370 raw, output : bool 

371 See :meth:`get_range` 

372 latest : bool 

373 Select rows with max (session, line) 

374 

375 Returns 

376 ------- 

377 Tuples as :meth:`get_range` 

378 """ 

379 toget = "source_raw" if raw else "source" 

380 sqlfrom = "history" 

381 if output: 

382 sqlfrom = "history LEFT JOIN output_history USING (session, line)" 

383 toget = "history.%s, output_history.output" % toget 

384 if latest: 

385 toget += ", MAX(session * 128 * 1024 + line)" 

386 this_querry = "SELECT session, line, %s FROM %s " % (toget, sqlfrom) + sql 

387 cur = self.db.execute(this_querry, params) 

388 if latest: 

389 cur = (row[:-1] for row in cur) 

390 if output: # Regroup into 3-tuples, and parse JSON 

391 return ((ses, lin, (inp, out)) for ses, lin, inp, out in cur) 

392 return cur 

393 

394 @only_when_enabled 

395 @catch_corrupt_db 

396 def get_session_info( 

397 self, session: int 

398 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]: 

399 """Get info about a session. 

400 

401 Parameters 

402 ---------- 

403 session : int 

404 Session number to retrieve. 

405 

406 Returns 

407 ------- 

408 session_id : int 

409 Session ID number 

410 start : datetime 

411 Timestamp for the start of the session. 

412 end : datetime 

413 Timestamp for the end of the session, or None if IPython crashed. 

414 num_cmds : int 

415 Number of commands run, or None if IPython crashed. 

416 remark : str 

417 A manually set description. 

418 """ 

419 query = "SELECT * from sessions where session == ?" 

420 return self.db.execute(query, (session,)).fetchone() 

421 

422 @catch_corrupt_db 

423 def get_last_session_id(self) -> Optional[int]: 

424 """Get the last session ID currently in the database. 

425 

426 Within IPython, this should be the same as the value stored in 

427 :attr:`HistoryManager.session_number`. 

428 """ 

429 for record in self.get_tail(n=1, include_latest=True): 

430 return record[0] 

431 return None 

432 

433 @catch_corrupt_db 

434 def get_tail( 

435 self, 

436 n: int = 10, 

437 raw: bool = True, 

438 output: bool = False, 

439 include_latest: bool = False, 

440 ) -> Iterable[tuple[int, int, InOrInOut]]: 

441 """Get the last n lines from the history database. 

442 

443 Parameters 

444 ---------- 

445 n : int 

446 The number of lines to get 

447 raw, output : bool 

448 See :meth:`get_range` 

449 include_latest : bool 

450 If False (default), n+1 lines are fetched, and the latest one 

451 is discarded. This is intended to be used where the function 

452 is called by a user command, which it should not return. 

453 

454 Returns 

455 ------- 

456 Tuples as :meth:`get_range` 

457 """ 

458 self.writeout_cache() 

459 if not include_latest: 

460 n += 1 

461 cur = self._run_sql( 

462 "ORDER BY session DESC, line DESC LIMIT ?", (n,), raw=raw, output=output 

463 ) 

464 if not include_latest: 

465 return reversed(list(cur)[1:]) 

466 return reversed(list(cur)) 

467 

468 @catch_corrupt_db 

469 def search( 

470 self, 

471 pattern: str = "*", 

472 raw: bool = True, 

473 search_raw: bool = True, 

474 output: bool = False, 

475 n: Optional[int] = None, 

476 unique: bool = False, 

477 ) -> Iterable[tuple[int, int, InOrInOut]]: 

478 """Search the database using unix glob-style matching (wildcards 

479 * and ?). 

480 

481 Parameters 

482 ---------- 

483 pattern : str 

484 The wildcarded pattern to match when searching 

485 search_raw : bool 

486 If True, search the raw input, otherwise, the parsed input 

487 raw, output : bool 

488 See :meth:`get_range` 

489 n : None or int 

490 If an integer is given, it defines the limit of 

491 returned entries. 

492 unique : bool 

493 When it is true, return only unique entries. 

494 

495 Returns 

496 ------- 

497 Tuples as :meth:`get_range` 

498 """ 

499 tosearch = "source_raw" if search_raw else "source" 

500 if output: 

501 tosearch = "history." + tosearch 

502 self.writeout_cache() 

503 sqlform = "WHERE %s GLOB ?" % tosearch 

504 params: tuple[typing.Any, ...] = (pattern,) 

505 if unique: 

506 sqlform += " GROUP BY {0}".format(tosearch) 

507 if n is not None: 

508 sqlform += " ORDER BY session DESC, line DESC LIMIT ?" 

509 params += (n,) 

510 elif unique: 

511 sqlform += " ORDER BY session, line" 

512 cur = self._run_sql(sqlform, params, raw=raw, output=output, latest=unique) 

513 if n is not None: 

514 return reversed(list(cur)) 

515 return cur 

516 

517 @catch_corrupt_db 

518 def get_range( 

519 self, 

520 session: int, 

521 start: int = 1, 

522 stop: Optional[int] = None, 

523 raw: bool = True, 

524 output: bool = False, 

525 ) -> Iterable[tuple[int, int, InOrInOut]]: 

526 """Retrieve input by session. 

527 

528 Parameters 

529 ---------- 

530 session : int 

531 Session number to retrieve. 

532 start : int 

533 First line to retrieve. 

534 stop : int 

535 End of line range (excluded from output itself). If None, retrieve 

536 to the end of the session. 

537 raw : bool 

538 If True, return untranslated input 

539 output : bool 

540 If True, attempt to include output. This will be 'real' Python 

541 objects for the current session, or text reprs from previous 

542 sessions if db_log_output was enabled at the time. Where no output 

543 is found, None is used. 

544 

545 Returns 

546 ------- 

547 entries 

548 An iterator over the desired lines. Each line is a 3-tuple, either 

549 (session, line, input) if output is False, or 

550 (session, line, (input, output)) if output is True. 

551 """ 

552 params: tuple[typing.Any, ...] 

553 if stop: 

554 lineclause = "line >= ? AND line < ?" 

555 params = (session, start, stop) 

556 else: 

557 lineclause = "line>=?" 

558 params = (session, start) 

559 

560 return self._run_sql( 

561 "WHERE session==? AND %s" % lineclause, params, raw=raw, output=output 

562 ) 

563 

564 def get_range_by_str( 

565 self, rangestr: str, raw: bool = True, output: bool = False 

566 ) -> Iterable[tuple[int, int, InOrInOut]]: 

567 """Get lines of history from a string of ranges, as used by magic 

568 commands %hist, %save, %macro, etc. 

569 

570 Parameters 

571 ---------- 

572 rangestr : str 

573 A string specifying ranges, e.g. "5 ~2/1-4". If empty string is used, 

574 this will return everything from current session's history. 

575 

576 See the documentation of :func:`%history` for the full details. 

577 

578 raw, output : bool 

579 As :meth:`get_range` 

580 

581 Returns 

582 ------- 

583 Tuples as :meth:`get_range` 

584 """ 

585 for sess, s, e in extract_hist_ranges(rangestr): 

586 yield from self.get_range(sess, s, e, raw=raw, output=output) 

587 

588 

589@dataclass 

590class HistoryOutput: 

591 output_type: typing.Literal[ 

592 "out_stream", "err_stream", "display_data", "execute_result" 

593 ] 

594 bundle: typing.Dict[str, str | list[str]] 

595 

596 

597class HistoryManager(HistoryAccessor): 

598 """A class to organize all history-related functionality in one place.""" 

599 

600 # Public interface 

601 

602 # An instance of the IPython shell we are attached to 

603 shell = Instance( 

604 "IPython.core.interactiveshell.InteractiveShellABC", allow_none=False 

605 ) 

606 # Lists to hold processed and raw history. These start with a blank entry 

607 # so that we can index them starting from 1 

608 input_hist_parsed = List([""]) 

609 input_hist_raw = List([""]) 

610 # A list of directories visited during session 

611 dir_hist: List = List() 

612 

613 @default("dir_hist") 

614 def _dir_hist_default(self) -> list[Path]: 

615 try: 

616 return [Path.cwd()] 

617 except OSError: 

618 return [] 

619 

620 # A dict of output history, keyed with ints from the shell's 

621 # execution count. 

622 output_hist = Dict() 

623 # The text/plain repr of outputs. 

624 output_hist_reprs: typing.Dict[int, str] = Dict() # type: ignore [assignment] 

625 # Maps execution_count to MIME bundles 

626 outputs: typing.Dict[int, typing.List[HistoryOutput]] = defaultdict(list) 

627 # Maps execution_count to exception tracebacks 

628 exceptions: typing.Dict[int, typing.Dict[str, Any]] = Dict() # type: ignore [assignment] 

629 

630 # The number of the current session in the history database 

631 session_number: int = Integer() # type: ignore [assignment] 

632 

633 db_log_output = Bool( 

634 False, help="Should the history database include output? (default: no)" 

635 ).tag(config=True) 

636 db_cache_size = Integer( 

637 0, 

638 help="Write to database every x commands (higher values save disk access & power).\n" 

639 "Values of 1 or less effectively disable caching.", 

640 ).tag(config=True) 

641 # The input and output caches 

642 db_input_cache: List[tuple[int, str, str]] = List() 

643 db_output_cache: List[tuple[int, str]] = List() 

644 

645 # History saving in separate thread 

646 save_thread = Instance("IPython.core.history.HistorySavingThread", allow_none=True) 

647 

648 @property 

649 def save_flag(self) -> threading.Event | None: 

650 if self.save_thread is not None: 

651 return self.save_thread.save_flag 

652 return None 

653 

654 # Private interface 

655 # Variables used to store the three last inputs from the user. On each new 

656 # history update, we populate the user's namespace with these, shifted as 

657 # necessary. 

658 _i00 = Unicode("") 

659 _i = Unicode("") 

660 _ii = Unicode("") 

661 _iii = Unicode("") 

662 

663 # A regex matching all forms of the exit command, so that we don't store 

664 # them in the history (it's annoying to rewind the first entry and land on 

665 # an exit call). 

666 _exit_re = re.compile(r"(exit|quit)(\s*\(.*\))?$") 

667 

668 _instances: WeakSet[HistoryManager] = WeakSet() 

669 _max_inst: int | float = float("inf") 

670 

671 def __init__( 

672 self, 

673 shell: InteractiveShell, 

674 config: Optional[Configuration] = None, 

675 **traits: typing.Any, 

676 ): 

677 """Create a new history manager associated with a shell instance.""" 

678 super().__init__(shell=shell, config=config, **traits) 

679 self.db_input_cache_lock = threading.Lock() 

680 self.db_output_cache_lock = threading.Lock() 

681 

682 try: 

683 self.new_session() 

684 except OperationalError: 

685 self.log.error( 

686 "Failed to create history session in %s. History will not be saved.", 

687 self.hist_file, 

688 exc_info=True, 

689 ) 

690 self.hist_file = ":memory:" 

691 

692 if self.enabled and self.hist_file != ":memory:": 

693 self.save_thread = HistorySavingThread(self) 

694 try: 

695 self.save_thread.start() 

696 except RuntimeError: 

697 self.log.error( 

698 "Failed to start history saving thread. History will not be saved.", 

699 exc_info=True, 

700 ) 

701 self.hist_file = ":memory:" 

702 self._instances.add(self) 

703 assert len(HistoryManager._instances) <= HistoryManager._max_inst, ( 

704 len(HistoryManager._instances), 

705 HistoryManager._max_inst, 

706 ) 

707 

708 def __del__(self) -> None: 

709 if self.save_thread is not None: 

710 self.save_thread.stop() 

711 

712 def _get_hist_file_name(self, profile: Optional[str] = None) -> Path: 

713 """Get default history file name based on the Shell's profile. 

714 

715 The profile parameter is ignored, but must exist for compatibility with 

716 the parent class.""" 

717 profile_dir = self.shell.profile_dir.location 

718 return Path(profile_dir) / "history.sqlite" 

719 

720 @only_when_enabled 

721 def new_session(self, conn: Optional[sqlite3.Connection] = None) -> None: 

722 """Get a new session number.""" 

723 if conn is None: 

724 conn = self.db 

725 

726 with conn: 

727 cur = conn.execute( 

728 """INSERT INTO sessions VALUES (NULL, ?, NULL, 

729 NULL, '') """, 

730 (datetime.datetime.now().isoformat(" "),), 

731 ) 

732 assert isinstance(cur.lastrowid, int) 

733 self.session_number = cur.lastrowid 

734 

735 def end_session(self) -> None: 

736 """Close the database session, filling in the end time and line count.""" 

737 self.writeout_cache() 

738 with self.db: 

739 self.db.execute( 

740 """UPDATE sessions SET end=?, num_cmds=? WHERE 

741 session==?""", 

742 ( 

743 datetime.datetime.now(datetime.timezone.utc).isoformat(" "), 

744 len(self.input_hist_parsed) - 1, 

745 self.session_number, 

746 ), 

747 ) 

748 self.session_number = 0 

749 

750 def name_session(self, name: str) -> None: 

751 """Give the current session a name in the history database.""" 

752 warn( 

753 "name_session is deprecated in IPython 9.0 and will be removed in future versions", 

754 DeprecationWarning, 

755 stacklevel=2, 

756 ) 

757 with self.db: 

758 self.db.execute( 

759 "UPDATE sessions SET remark=? WHERE session==?", 

760 (name, self.session_number), 

761 ) 

762 

763 def reset(self, new_session: bool = True) -> None: 

764 """Clear the session history, releasing all object references, and 

765 optionally open a new session.""" 

766 self.output_hist.clear() 

767 self.outputs.clear() 

768 self.exceptions.clear() 

769 

770 # The directory history can't be completely empty 

771 self.dir_hist[:] = [Path.cwd()] 

772 

773 if new_session: 

774 if self.session_number: 

775 self.end_session() 

776 self.input_hist_parsed[:] = [""] 

777 self.input_hist_raw[:] = [""] 

778 self.new_session() 

779 

780 # ------------------------------ 

781 # Methods for retrieving history 

782 # ------------------------------ 

783 def get_session_info( 

784 self, session: int = 0 

785 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]: 

786 """Get info about a session. 

787 

788 Parameters 

789 ---------- 

790 session : int 

791 Session number to retrieve. The current session is 0, and negative 

792 numbers count back from current session, so -1 is the previous session. 

793 

794 Returns 

795 ------- 

796 session_id : int 

797 Session ID number 

798 start : datetime 

799 Timestamp for the start of the session. 

800 end : datetime 

801 Timestamp for the end of the session, or None if IPython crashed. 

802 num_cmds : int 

803 Number of commands run, or None if IPython crashed. 

804 remark : str 

805 A manually set description. 

806 """ 

807 if session <= 0: 

808 session += self.session_number 

809 

810 return super(HistoryManager, self).get_session_info(session=session) 

811 

812 @catch_corrupt_db 

813 def get_tail( 

814 self, 

815 n: int = 10, 

816 raw: bool = True, 

817 output: bool = False, 

818 include_latest: bool = False, 

819 ) -> Iterable[tuple[int, int, InOrInOut]]: 

820 """Get the last n lines from the history database. 

821 

822 Most recent entry last. 

823 

824 Completion will be reordered so that that the last ones are when 

825 possible from current session. 

826 

827 Parameters 

828 ---------- 

829 n : int 

830 The number of lines to get 

831 raw, output : bool 

832 See :meth:`get_range` 

833 include_latest : bool 

834 If False (default), n+1 lines are fetched, and the latest one 

835 is discarded. This is intended to be used where the function 

836 is called by a user command, which it should not return. 

837 

838 Returns 

839 ------- 

840 Tuples as :meth:`get_range` 

841 """ 

842 self.writeout_cache() 

843 if not include_latest: 

844 n += 1 

845 # cursor/line/entry 

846 this_cur = list( 

847 self._run_sql( 

848 "WHERE session == ? ORDER BY line DESC LIMIT ? ", 

849 (self.session_number, n), 

850 raw=raw, 

851 output=output, 

852 ) 

853 ) 

854 other_cur = list( 

855 self._run_sql( 

856 "WHERE session != ? ORDER BY session DESC, line DESC LIMIT ?", 

857 (self.session_number, n), 

858 raw=raw, 

859 output=output, 

860 ) 

861 ) 

862 

863 everything: list[tuple[int, int, InOrInOut]] = this_cur + other_cur 

864 

865 everything = everything[:n] 

866 

867 if not include_latest: 

868 return list(everything)[:0:-1] 

869 return list(everything)[::-1] 

870 

871 def _get_range_session( 

872 self, 

873 start: int = 1, 

874 stop: Optional[int] = None, 

875 raw: bool = True, 

876 output: bool = False, 

877 ) -> Iterable[tuple[int, int, InOrInOut]]: 

878 """Get input and output history from the current session. Called by 

879 get_range, and takes similar parameters.""" 

880 input_hist = self.input_hist_raw if raw else self.input_hist_parsed 

881 

882 n = len(input_hist) 

883 if start < 0: 

884 start += n 

885 if not stop or (stop > n): 

886 stop = n 

887 elif stop < 0: 

888 stop += n 

889 line: InOrInOut 

890 for i in range(start, stop): 

891 if output: 

892 line = (input_hist[i], self.output_hist_reprs.get(i)) 

893 else: 

894 line = input_hist[i] 

895 yield (0, i, line) 

896 

897 def get_range( 

898 self, 

899 session: int = 0, 

900 start: int = 1, 

901 stop: Optional[int] = None, 

902 raw: bool = True, 

903 output: bool = False, 

904 ) -> Iterable[tuple[int, int, InOrInOut]]: 

905 """Retrieve input by session. 

906 

907 Parameters 

908 ---------- 

909 session : int 

910 Session number to retrieve. The current session is 0, and negative 

911 numbers count back from current session, so -1 is previous session. 

912 start : int 

913 First line to retrieve. 

914 stop : int 

915 End of line range (excluded from output itself). If None, retrieve 

916 to the end of the session. 

917 raw : bool 

918 If True, return untranslated input 

919 output : bool 

920 If True, attempt to include output. This will be 'real' Python 

921 objects for the current session, or text reprs from previous 

922 sessions if db_log_output was enabled at the time. Where no output 

923 is found, None is used. 

924 

925 Returns 

926 ------- 

927 entries 

928 An iterator over the desired lines. Each line is a 3-tuple, either 

929 (session, line, input) if output is False, or 

930 (session, line, (input, output)) if output is True. 

931 """ 

932 if session <= 0: 

933 session += self.session_number 

934 if session == self.session_number: # Current session 

935 return self._get_range_session(start, stop, raw, output) 

936 return super(HistoryManager, self).get_range(session, start, stop, raw, output) 

937 

938 ## ---------------------------- 

939 ## Methods for storing history: 

940 ## ---------------------------- 

941 def store_inputs( 

942 self, line_num: int, source: str, source_raw: Optional[str] = None 

943 ) -> None: 

944 """Store source and raw input in history and create input cache 

945 variables ``_i*``. 

946 

947 Parameters 

948 ---------- 

949 line_num : int 

950 The prompt number of this input. 

951 source : str 

952 Python input. 

953 source_raw : str, optional 

954 If given, this is the raw input without any IPython transformations 

955 applied to it. If not given, ``source`` is used. 

956 """ 

957 if source_raw is None: 

958 source_raw = source 

959 source = source.rstrip("\n") 

960 source_raw = source_raw.rstrip("\n") 

961 

962 # do not store exit/quit commands 

963 if self._exit_re.match(source_raw.strip()): 

964 return 

965 

966 self.input_hist_parsed.append(source) 

967 self.input_hist_raw.append(source_raw) 

968 

969 with self.db_input_cache_lock: 

970 self.db_input_cache.append((line_num, source, source_raw)) 

971 # Trigger to flush cache and write to DB. 

972 if len(self.db_input_cache) >= self.db_cache_size: 

973 if self.save_flag: 

974 self.save_flag.set() 

975 

976 # update the auto _i variables 

977 self._iii = self._ii 

978 self._ii = self._i 

979 self._i = self._i00 

980 self._i00 = source_raw 

981 

982 # hackish access to user namespace to create _i1,_i2... dynamically 

983 new_i = "_i%s" % line_num 

984 to_main = {"_i": self._i, "_ii": self._ii, "_iii": self._iii, new_i: self._i00} 

985 

986 if self.shell is not None: 

987 self.shell.push(to_main, interactive=False) 

988 

989 def store_output(self, line_num: int) -> None: 

990 """If database output logging is enabled, this saves all the 

991 outputs from the indicated prompt number to the database. It's 

992 called by run_cell after code has been executed. 

993 

994 Parameters 

995 ---------- 

996 line_num : int 

997 The line number from which to save outputs 

998 """ 

999 if (not self.db_log_output) or (line_num not in self.output_hist_reprs): 

1000 return 

1001 lnum: int = line_num 

1002 output = self.output_hist_reprs[line_num] 

1003 

1004 with self.db_output_cache_lock: 

1005 self.db_output_cache.append((line_num, output)) 

1006 if self.db_cache_size <= 1 and self.save_flag is not None: 

1007 self.save_flag.set() 

1008 

1009 def _writeout_input_cache(self, conn: sqlite3.Connection) -> None: 

1010 with conn: 

1011 for line in self.db_input_cache: 

1012 conn.execute( 

1013 "INSERT INTO history VALUES (?, ?, ?, ?)", 

1014 (self.session_number,) + line, 

1015 ) 

1016 

1017 def _writeout_output_cache(self, conn: sqlite3.Connection) -> None: 

1018 with conn: 

1019 for line in self.db_output_cache: 

1020 conn.execute( 

1021 "INSERT INTO output_history VALUES (?, ?, ?)", 

1022 (self.session_number,) + line, 

1023 ) 

1024 

1025 @only_when_enabled 

1026 def writeout_cache(self, conn: Optional[sqlite3.Connection] = None) -> None: 

1027 """Write any entries in the cache to the database.""" 

1028 if conn is None: 

1029 conn = self.db 

1030 

1031 with self.db_input_cache_lock: 

1032 try: 

1033 self._writeout_input_cache(conn) 

1034 except sqlite3.IntegrityError: 

1035 self.new_session(conn) 

1036 print( 

1037 "ERROR! Session/line number was not unique in", 

1038 "database. History logging moved to new session", 

1039 self.session_number, 

1040 ) 

1041 try: 

1042 # Try writing to the new session. If this fails, don't 

1043 # recurse 

1044 self._writeout_input_cache(conn) 

1045 except sqlite3.IntegrityError: 

1046 pass 

1047 finally: 

1048 self.db_input_cache = [] 

1049 

1050 with self.db_output_cache_lock: 

1051 try: 

1052 self._writeout_output_cache(conn) 

1053 except sqlite3.IntegrityError: 

1054 print( 

1055 "!! Session/line number for output was not unique", 

1056 "in database. Output will not be stored.", 

1057 ) 

1058 finally: 

1059 self.db_output_cache = [] 

1060 

1061 

1062from collections.abc import Callable, Iterator 

1063from weakref import ReferenceType 

1064 

1065 

1066@contextmanager 

1067def hold(ref: ReferenceType[HistoryManager]) -> Iterator[ReferenceType[HistoryManager]]: 

1068 """ 

1069 Context manger that hold a reference to a weak ref to make sure it 

1070 is not GC'd during it's context. 

1071 """ 

1072 r = ref() 

1073 yield ref 

1074 del r 

1075 

1076 

1077class HistorySavingThread(threading.Thread): 

1078 """This thread takes care of writing history to the database, so that 

1079 the UI isn't held up while that happens. 

1080 

1081 It waits for the HistoryManager's save_flag to be set, then writes out 

1082 the history cache. The main thread is responsible for setting the flag when 

1083 the cache size reaches a defined threshold.""" 

1084 

1085 save_flag: threading.Event 

1086 daemon: bool = True 

1087 _stop_now: bool = False 

1088 enabled: bool = True 

1089 history_manager: ref[HistoryManager] 

1090 _stopped = False 

1091 

1092 def __init__(self, history_manager: HistoryManager) -> None: 

1093 super(HistorySavingThread, self).__init__(name="IPythonHistorySavingThread") 

1094 self.history_manager = ref(history_manager) 

1095 self.enabled = history_manager.enabled 

1096 self.save_flag = threading.Event() 

1097 

1098 @only_when_enabled 

1099 def run(self) -> None: 

1100 atexit.register(self.stop) 

1101 # We need a separate db connection per thread: 

1102 try: 

1103 hm: ReferenceType[HistoryManager] 

1104 with hold(self.history_manager) as hm: 

1105 if hm() is not None: 

1106 self.db = sqlite3.connect( 

1107 str(hm().hist_file), # type: ignore [union-attr] 

1108 **hm().connection_options, # type: ignore [union-attr] 

1109 ) 

1110 while True: 

1111 self.save_flag.wait() 

1112 with hold(self.history_manager) as hm: 

1113 if hm() is None: 

1114 self._stop_now = True 

1115 if self._stop_now: 

1116 self.db.close() 

1117 return 

1118 self.save_flag.clear() 

1119 if hm() is not None: 

1120 hm().writeout_cache(self.db) # type: ignore [union-attr] 

1121 

1122 except Exception as e: 

1123 print( 

1124 ( 

1125 "The history saving thread hit an unexpected error (%s)." 

1126 "History will not be written to the database." 

1127 ) 

1128 % repr(e) 

1129 ) 

1130 finally: 

1131 atexit.unregister(self.stop) 

1132 

1133 def stop(self) -> None: 

1134 """This can be called from the main thread to safely stop this thread. 

1135 

1136 Note that it does not attempt to write out remaining history before 

1137 exiting. That should be done by calling the HistoryManager's 

1138 end_session method.""" 

1139 if self._stopped: 

1140 return 

1141 self._stop_now = True 

1142 

1143 self.save_flag.set() 

1144 self._stopped = True 

1145 if self != threading.current_thread(): 

1146 self.join() 

1147 

1148 def __del__(self) -> None: 

1149 self.stop() 

1150 

1151 

1152# To match, e.g. ~5/8-~2/3, or ~4 (without trailing slash for full session) 

1153# Session numbers: ~N or N/ 

1154# Line numbers: N (just digits, no ~) 

1155# Range syntax: 4-6 (with end) or 4- (without end, means "onward") 

1156range_re = re.compile( 

1157 r""" 

1158((?P<startsess>(?:~?\d+/)))? 

1159(?P<start>\d+)? 

1160((?P<sep>[\-:]) 

1161 ((?P<endsess>(?:~?\d+/)))? 

1162 (?P<end>\d*))? 

1163$""", 

1164 re.VERBOSE, 

1165) 

1166 

1167 

1168def extract_hist_ranges(ranges_str: str) -> Iterable[tuple[int, int, Optional[int]]]: 

1169 """Turn a string of history ranges into 3-tuples of (session, start, stop). 

1170 

1171 Empty string results in a `[(0, 1, None)]`, i.e. "everything from current 

1172 session". 

1173 

1174 Examples 

1175 -------- 

1176 >>> list(extract_hist_ranges("~8/5-~7/4 2")) 

1177 [(-8, 5, None), (-7, 1, 5), (0, 2, 3)] 

1178 >>> list(extract_hist_ranges("~4/")) 

1179 [(-4, 1, None)] 

1180 >>> list(extract_hist_ranges("4-")) 

1181 [(0, 4, None)] 

1182 >>> list(extract_hist_ranges("~4/4-")) 

1183 [(-4, 4, None)] 

1184 """ 

1185 if ranges_str == "": 

1186 yield (0, 1, None) # Everything from current session 

1187 return 

1188 

1189 for range_str in ranges_str.split(): 

1190 rmatch = range_re.match(range_str) 

1191 if not rmatch: 

1192 continue 

1193 start = rmatch.group("start") 

1194 sep = rmatch.group("sep") 

1195 if start: 

1196 start = int(start) 

1197 end = rmatch.group("end") 

1198 if sep == "-": 

1199 end = (int(end) + 1) if end else None 

1200 else: 

1201 end = int(end) if end else start + 1 

1202 else: 

1203 if not rmatch.group("startsess"): 

1204 continue 

1205 start = 1 

1206 end = None 

1207 startsess = rmatch.group("startsess") or "0" 

1208 endsess = rmatch.group("endsess") or startsess 

1209 startsess = startsess.rstrip("/") 

1210 endsess = endsess.rstrip("/") 

1211 startsess = int(startsess.replace("~", "-")) 

1212 endsess = int(endsess.replace("~", "-")) 

1213 assert endsess >= startsess, "start session must be earlier than end session" 

1214 

1215 if endsess == startsess: 

1216 yield (startsess, start, end) 

1217 continue 

1218 # Multiple sessions in one range: 

1219 yield (startsess, start, None) 

1220 for sess in range(startsess + 1, endsess): 

1221 yield (sess, 1, None) 

1222 yield (endsess, 1, end) 

1223 

1224 

1225def _format_lineno(session: int, line: int) -> str: 

1226 """Helper function to format line numbers properly.""" 

1227 if session == 0: 

1228 return str(line) 

1229 return "%s#%s" % (session, line)