Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/history.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

489 statements  

1"""History related magics and functionality""" 

2 

3from __future__ import annotations 

4 

5# Copyright (c) IPython Development Team. 

6# Distributed under the terms of the Modified BSD License. 

7 

8 

9import atexit 

10import datetime 

11import os 

12import re 

13 

14 

15import threading 

16from pathlib import Path 

17 

18from collections import defaultdict 

19from contextlib import contextmanager 

20from dataclasses import dataclass 

21from decorator import decorator 

22from traitlets import ( 

23 Any, 

24 Bool, 

25 Dict, 

26 Instance, 

27 Integer, 

28 List, 

29 TraitError, 

30 Unicode, 

31 Union, 

32 default, 

33 observe, 

34) 

35from traitlets.config.configurable import LoggingConfigurable 

36 

37from IPython.paths import locate_profile 

38from IPython.utils.decorators import undoc 

39from typing import Tuple, Optional, TYPE_CHECKING 

40from collections.abc import Iterable 

41import typing 

42from warnings import warn 

43from weakref import ref, WeakSet 

44 

45if TYPE_CHECKING: 

46 from IPython.core.interactiveshell import InteractiveShell 

47 from IPython.config.Configuration import Configuration 

48 

49try: 

50 from sqlite3 import DatabaseError, OperationalError 

51 import sqlite3 

52 

53 sqlite3.register_converter( 

54 "timestamp", lambda val: datetime.datetime.fromisoformat(val.decode()) 

55 ) 

56 

57 sqlite3_found = True 

58except ModuleNotFoundError: 

59 sqlite3_found = False 

60 

61 class DatabaseError(Exception): # type: ignore [no-redef] 

62 pass 

63 

64 class OperationalError(Exception): # type: ignore [no-redef] 

65 pass 

66 

67 

68InOrInOut = typing.Union[str, tuple[str, Optional[str]]] 

69 

70# ----------------------------------------------------------------------------- 

71# Classes and functions 

72# ----------------------------------------------------------------------------- 

73 

74 

75@undoc 

76class DummyDB: 

77 """Dummy DB that will act as a black hole for history. 

78 

79 Only used in the absence of sqlite""" 

80 

81 def execute(*args: typing.Any, **kwargs: typing.Any) -> list: 

82 return [] 

83 

84 def commit(self, *args, **kwargs): # type: ignore [no-untyped-def] 

85 pass 

86 

87 def __enter__(self, *args, **kwargs): # type: ignore [no-untyped-def] 

88 pass 

89 

90 def __exit__(self, *args, **kwargs): # type: ignore [no-untyped-def] 

91 pass 

92 

93 

94@decorator 

95def only_when_enabled(f, self, *a, **kw): # type: ignore [no-untyped-def] 

96 """Decorator: return an empty list in the absence of sqlite.""" 

97 if not self.enabled: 

98 return [] 

99 else: 

100 return f(self, *a, **kw) 

101 

102 

103# use 16kB as threshold for whether a corrupt history db should be saved 

104# that should be at least 100 entries or so 

105_SAVE_DB_SIZE = 16384 

106 

107 

108@decorator 

109def catch_corrupt_db(f, self, *a, **kw): # type: ignore [no-untyped-def] 

110 """A decorator which wraps HistoryAccessor method calls to catch errors from 

111 a corrupt SQLite database, move the old database out of the way, and create 

112 a new one. 

113 

114 We avoid clobbering larger databases because this may be triggered due to filesystem issues, 

115 not just a corrupt file. 

116 """ 

117 try: 

118 return f(self, *a, **kw) 

119 except (DatabaseError, OperationalError) as e: 

120 self._corrupt_db_counter += 1 

121 self.log.error("Failed to open SQLite history %s (%s).", self.hist_file, e) 

122 if self.hist_file != ":memory:": 

123 if self._corrupt_db_counter > self._corrupt_db_limit: 

124 self.hist_file = ":memory:" 

125 self.log.error( 

126 "Failed to load history too many times, history will not be saved." 

127 ) 

128 elif self.hist_file.is_file(): 

129 # move the file out of the way 

130 base = str(self.hist_file.parent / self.hist_file.stem) 

131 ext = self.hist_file.suffix 

132 size = self.hist_file.stat().st_size 

133 if size >= _SAVE_DB_SIZE: 

134 # if there's significant content, avoid clobbering 

135 now = ( 

136 datetime.datetime.now(datetime.timezone.utc) 

137 .isoformat() 

138 .replace(":", ".") 

139 ) 

140 newpath = base + "-corrupt-" + now + ext 

141 # don't clobber previous corrupt backups 

142 for i in range(100): 

143 if not Path(newpath).exists(): 

144 break 

145 else: 

146 newpath = base + "-corrupt-" + now + ("-%i" % i) + ext 

147 else: 

148 # not much content, possibly empty; don't worry about clobbering 

149 # maybe we should just delete it? 

150 newpath = base + "-corrupt" + ext 

151 self.hist_file.rename(newpath) 

152 self.log.error( 

153 "History file was moved to %s and a new file created.", newpath 

154 ) 

155 self.init_db() 

156 return [] 

157 else: 

158 # Failed with :memory:, something serious is wrong 

159 raise 

160 

161 

162class HistoryAccessorBase(LoggingConfigurable): 

163 """An abstract class for History Accessors""" 

164 

165 def get_tail( 

166 self, 

167 n: int = 10, 

168 raw: bool = True, 

169 output: bool = False, 

170 include_latest: bool = False, 

171 ) -> Iterable[tuple[int, int, InOrInOut]]: 

172 raise NotImplementedError 

173 

174 def search( 

175 self, 

176 pattern: str = "*", 

177 raw: bool = True, 

178 search_raw: bool = True, 

179 output: bool = False, 

180 n: Optional[int] = None, 

181 unique: bool = False, 

182 ) -> Iterable[tuple[int, int, InOrInOut]]: 

183 raise NotImplementedError 

184 

185 def get_range( 

186 self, 

187 session: int, 

188 start: int = 1, 

189 stop: Optional[int] = None, 

190 raw: bool = True, 

191 output: bool = False, 

192 ) -> Iterable[tuple[int, int, InOrInOut]]: 

193 raise NotImplementedError 

194 

195 def get_range_by_str( 

196 self, rangestr: str, raw: bool = True, output: bool = False 

197 ) -> Iterable[tuple[int, int, InOrInOut]]: 

198 raise NotImplementedError 

199 

200 

201class HistoryAccessor(HistoryAccessorBase): 

202 """Access the history database without adding to it. 

203 

204 This is intended for use by standalone history tools. IPython shells use 

205 HistoryManager, below, which is a subclass of this.""" 

206 

207 # counter for init_db retries, so we don't keep trying over and over 

208 _corrupt_db_counter = 0 

209 # after two failures, fallback on :memory: 

210 _corrupt_db_limit = 2 

211 

212 # String holding the path to the history file 

213 hist_file = Union( 

214 [Instance(Path), Unicode()], 

215 help="""Path to file to use for SQLite history database. 

216 

217 By default, IPython will put the history database in the IPython 

218 profile directory. If you would rather share one history among 

219 profiles, you can set this value in each, so that they are consistent. 

220 

221 Due to an issue with fcntl, SQLite is known to misbehave on some NFS 

222 mounts. If you see IPython hanging, try setting this to something on a 

223 local disk, e.g:: 

224 

225 ipython --HistoryManager.hist_file=/tmp/ipython_hist.sqlite 

226 

227 you can also use the specific value `:memory:` (including the colon 

228 at both end but not the back ticks), to avoid creating an history file. 

229 

230 """, 

231 ).tag(config=True) 

232 

233 enabled = Bool( 

234 sqlite3_found, 

235 help="""enable the SQLite history 

236 

237 set enabled=False to disable the SQLite history, 

238 in which case there will be no stored history, no SQLite connection, 

239 and no background saving thread. This may be necessary in some 

240 threaded environments where IPython is embedded. 

241 """, 

242 ).tag(config=True) 

243 

244 connection_options = Dict( 

245 help="""Options for configuring the SQLite connection 

246 

247 These options are passed as keyword args to sqlite3.connect 

248 when establishing database connections. 

249 """ 

250 ).tag(config=True) 

251 

252 @default("connection_options") 

253 def _default_connection_options(self) -> dict[str, bool]: 

254 return dict(check_same_thread=False) 

255 

256 # The SQLite database 

257 db = Any() 

258 

259 @observe("db") 

260 @only_when_enabled 

261 def _db_changed(self, change): # type: ignore [no-untyped-def] 

262 """validate the db, since it can be an Instance of two different types""" 

263 new = change["new"] 

264 connection_types = (DummyDB, sqlite3.Connection) 

265 if not isinstance(new, connection_types): 

266 msg = "%s.db must be sqlite3 Connection or DummyDB, not %r" % ( 

267 self.__class__.__name__, 

268 new, 

269 ) 

270 raise TraitError(msg) 

271 

272 def __init__( 

273 self, profile: str = "default", hist_file: str = "", **traits: typing.Any 

274 ) -> None: 

275 """Create a new history accessor. 

276 

277 Parameters 

278 ---------- 

279 profile : str 

280 The name of the profile from which to open history. 

281 hist_file : str 

282 Path to an SQLite history database stored by IPython. If specified, 

283 hist_file overrides profile. 

284 config : :class:`~traitlets.config.loader.Config` 

285 Config object. hist_file can also be set through this. 

286 """ 

287 super(HistoryAccessor, self).__init__(**traits) 

288 # defer setting hist_file from kwarg until after init, 

289 # otherwise the default kwarg value would clobber any value 

290 # set by config 

291 if hist_file: 

292 self.hist_file = hist_file 

293 

294 try: 

295 self.hist_file 

296 except TraitError: 

297 # No one has set the hist_file, yet. 

298 self.hist_file = self._get_hist_file_name(profile) 

299 

300 self.init_db() 

301 

302 def _get_hist_file_name(self, profile: str = "default") -> Path: 

303 """Find the history file for the given profile name. 

304 

305 This is overridden by the HistoryManager subclass, to use the shell's 

306 active profile. 

307 

308 Parameters 

309 ---------- 

310 profile : str 

311 The name of a profile which has a history file. 

312 """ 

313 return Path(locate_profile(profile)) / "history.sqlite" 

314 

315 @catch_corrupt_db 

316 def init_db(self) -> None: 

317 """Connect to the database, and create tables if necessary.""" 

318 if not self.enabled: 

319 self.db = DummyDB() 

320 return 

321 

322 # use detect_types so that timestamps return datetime objects 

323 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) 

324 kwargs.update(self.connection_options) 

325 self.db = sqlite3.connect(str(self.hist_file), **kwargs) # type: ignore [call-overload] 

326 with self.db: 

327 self.db.execute( 

328 """CREATE TABLE IF NOT EXISTS sessions (session integer 

329 primary key autoincrement, start timestamp, 

330 end timestamp, num_cmds integer, remark text)""" 

331 ) 

332 self.db.execute( 

333 """CREATE TABLE IF NOT EXISTS history 

334 (session integer, line integer, source text, source_raw text, 

335 PRIMARY KEY (session, line))""" 

336 ) 

337 # Output history is optional, but ensure the table's there so it can be 

338 # enabled later. 

339 self.db.execute( 

340 """CREATE TABLE IF NOT EXISTS output_history 

341 (session integer, line integer, output text, 

342 PRIMARY KEY (session, line))""" 

343 ) 

344 # success! reset corrupt db count 

345 self._corrupt_db_counter = 0 

346 

347 def writeout_cache(self) -> None: 

348 """Overridden by HistoryManager to dump the cache before certain 

349 database lookups.""" 

350 pass 

351 

352 ## ------------------------------- 

353 ## Methods for retrieving history: 

354 ## ------------------------------- 

355 def _run_sql( 

356 self, 

357 sql: str, 

358 params: tuple, 

359 raw: bool = True, 

360 output: bool = False, 

361 latest: bool = False, 

362 ) -> Iterable[tuple[int, int, InOrInOut]]: 

363 """Prepares and runs an SQL query for the history database. 

364 

365 Parameters 

366 ---------- 

367 sql : str 

368 Any filtering expressions to go after SELECT ... FROM ... 

369 params : tuple 

370 Parameters passed to the SQL query (to replace "?") 

371 raw, output : bool 

372 See :meth:`get_range` 

373 latest : bool 

374 Select rows with max (session, line) 

375 

376 Returns 

377 ------- 

378 Tuples as :meth:`get_range` 

379 """ 

380 toget = "source_raw" if raw else "source" 

381 sqlfrom = "history" 

382 if output: 

383 sqlfrom = "history LEFT JOIN output_history USING (session, line)" 

384 toget = "history.%s, output_history.output" % toget 

385 if latest: 

386 toget += ", MAX(session * 128 * 1024 + line)" 

387 this_querry = "SELECT session, line, %s FROM %s " % (toget, sqlfrom) + sql 

388 cur = self.db.execute(this_querry, params) 

389 if latest: 

390 cur = (row[:-1] for row in cur) 

391 if output: # Regroup into 3-tuples, and parse JSON 

392 return ((ses, lin, (inp, out)) for ses, lin, inp, out in cur) 

393 return cur 

394 

395 @only_when_enabled 

396 @catch_corrupt_db 

397 def get_session_info( 

398 self, session: int 

399 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]: 

400 """Get info about a session. 

401 

402 Parameters 

403 ---------- 

404 session : int 

405 Session number to retrieve. 

406 

407 Returns 

408 ------- 

409 session_id : int 

410 Session ID number 

411 start : datetime 

412 Timestamp for the start of the session. 

413 end : datetime 

414 Timestamp for the end of the session, or None if IPython crashed. 

415 num_cmds : int 

416 Number of commands run, or None if IPython crashed. 

417 remark : str 

418 A manually set description. 

419 """ 

420 query = "SELECT * from sessions where session == ?" 

421 return self.db.execute(query, (session,)).fetchone() 

422 

423 @catch_corrupt_db 

424 def get_last_session_id(self) -> Optional[int]: 

425 """Get the last session ID currently in the database. 

426 

427 Within IPython, this should be the same as the value stored in 

428 :attr:`HistoryManager.session_number`. 

429 """ 

430 for record in self.get_tail(n=1, include_latest=True): 

431 return record[0] 

432 return None 

433 

434 @catch_corrupt_db 

435 def get_tail( 

436 self, 

437 n: int = 10, 

438 raw: bool = True, 

439 output: bool = False, 

440 include_latest: bool = False, 

441 ) -> Iterable[tuple[int, int, InOrInOut]]: 

442 """Get the last n lines from the history database. 

443 

444 Parameters 

445 ---------- 

446 n : int 

447 The number of lines to get 

448 raw, output : bool 

449 See :meth:`get_range` 

450 include_latest : bool 

451 If False (default), n+1 lines are fetched, and the latest one 

452 is discarded. This is intended to be used where the function 

453 is called by a user command, which it should not return. 

454 

455 Returns 

456 ------- 

457 Tuples as :meth:`get_range` 

458 """ 

459 self.writeout_cache() 

460 if not include_latest: 

461 n += 1 

462 cur = self._run_sql( 

463 "ORDER BY session DESC, line DESC LIMIT ?", (n,), raw=raw, output=output 

464 ) 

465 if not include_latest: 

466 return reversed(list(cur)[1:]) 

467 return reversed(list(cur)) 

468 

469 @catch_corrupt_db 

470 def search( 

471 self, 

472 pattern: str = "*", 

473 raw: bool = True, 

474 search_raw: bool = True, 

475 output: bool = False, 

476 n: Optional[int] = None, 

477 unique: bool = False, 

478 ) -> Iterable[tuple[int, int, InOrInOut]]: 

479 """Search the database using unix glob-style matching (wildcards 

480 * and ?). 

481 

482 Parameters 

483 ---------- 

484 pattern : str 

485 The wildcarded pattern to match when searching 

486 search_raw : bool 

487 If True, search the raw input, otherwise, the parsed input 

488 raw, output : bool 

489 See :meth:`get_range` 

490 n : None or int 

491 If an integer is given, it defines the limit of 

492 returned entries. 

493 unique : bool 

494 When it is true, return only unique entries. 

495 

496 Returns 

497 ------- 

498 Tuples as :meth:`get_range` 

499 """ 

500 tosearch = "source_raw" if search_raw else "source" 

501 if output: 

502 tosearch = "history." + tosearch 

503 self.writeout_cache() 

504 sqlform = "WHERE %s GLOB ?" % tosearch 

505 params: tuple[typing.Any, ...] = (pattern,) 

506 if unique: 

507 sqlform += " GROUP BY {0}".format(tosearch) 

508 if n is not None: 

509 sqlform += " ORDER BY session DESC, line DESC LIMIT ?" 

510 params += (n,) 

511 elif unique: 

512 sqlform += " ORDER BY session, line" 

513 cur = self._run_sql(sqlform, params, raw=raw, output=output, latest=unique) 

514 if n is not None: 

515 return reversed(list(cur)) 

516 return cur 

517 

518 @catch_corrupt_db 

519 def get_range( 

520 self, 

521 session: int, 

522 start: int = 1, 

523 stop: Optional[int] = None, 

524 raw: bool = True, 

525 output: bool = False, 

526 ) -> Iterable[tuple[int, int, InOrInOut]]: 

527 """Retrieve input by session. 

528 

529 Parameters 

530 ---------- 

531 session : int 

532 Session number to retrieve. 

533 start : int 

534 First line to retrieve. 

535 stop : int 

536 End of line range (excluded from output itself). If None, retrieve 

537 to the end of the session. 

538 raw : bool 

539 If True, return untranslated input 

540 output : bool 

541 If True, attempt to include output. This will be 'real' Python 

542 objects for the current session, or text reprs from previous 

543 sessions if db_log_output was enabled at the time. Where no output 

544 is found, None is used. 

545 

546 Returns 

547 ------- 

548 entries 

549 An iterator over the desired lines. Each line is a 3-tuple, either 

550 (session, line, input) if output is False, or 

551 (session, line, (input, output)) if output is True. 

552 """ 

553 params: tuple[typing.Any, ...] 

554 if stop: 

555 lineclause = "line >= ? AND line < ?" 

556 params = (session, start, stop) 

557 else: 

558 lineclause = "line>=?" 

559 params = (session, start) 

560 

561 return self._run_sql( 

562 "WHERE session==? AND %s" % lineclause, params, raw=raw, output=output 

563 ) 

564 

565 def get_range_by_str( 

566 self, rangestr: str, raw: bool = True, output: bool = False 

567 ) -> Iterable[tuple[int, int, InOrInOut]]: 

568 """Get lines of history from a string of ranges, as used by magic 

569 commands %hist, %save, %macro, etc. 

570 

571 Parameters 

572 ---------- 

573 rangestr : str 

574 A string specifying ranges, e.g. "5 ~2/1-4". If empty string is used, 

575 this will return everything from current session's history. 

576 

577 See the documentation of :func:`%history` for the full details. 

578 

579 raw, output : bool 

580 As :meth:`get_range` 

581 

582 Returns 

583 ------- 

584 Tuples as :meth:`get_range` 

585 """ 

586 for sess, s, e in extract_hist_ranges(rangestr): 

587 yield from self.get_range(sess, s, e, raw=raw, output=output) 

588 

589 

590@dataclass 

591class HistoryOutput: 

592 output_type: typing.Literal[ 

593 "out_stream", "err_stream", "display_data", "execute_result" 

594 ] 

595 bundle: typing.Dict[str, str | list[str]] 

596 

597 

598class HistoryManager(HistoryAccessor): 

599 """A class to organize all history-related functionality in one place.""" 

600 

601 # Public interface 

602 

603 # An instance of the IPython shell we are attached to 

604 shell = Instance( 

605 "IPython.core.interactiveshell.InteractiveShellABC", allow_none=False 

606 ) 

607 # Lists to hold processed and raw history. These start with a blank entry 

608 # so that we can index them starting from 1 

609 input_hist_parsed = List([""]) 

610 input_hist_raw = List([""]) 

611 # A list of directories visited during session 

612 dir_hist: List = List() 

613 

614 @default("dir_hist") 

615 def _dir_hist_default(self) -> list[Path]: 

616 try: 

617 return [Path.cwd()] 

618 except OSError: 

619 return [] 

620 

621 # A dict of output history, keyed with ints from the shell's 

622 # execution count. 

623 output_hist = Dict() 

624 # The text/plain repr of outputs. 

625 output_hist_reprs: typing.Dict[int, str] = Dict() # type: ignore [assignment] 

626 # Maps execution_count to MIME bundles 

627 outputs: typing.Dict[int, typing.List[HistoryOutput]] = defaultdict(list) 

628 # Maps execution_count to exception tracebacks 

629 exceptions: typing.Dict[int, typing.Dict[str, Any]] = Dict() # type: ignore [assignment] 

630 

631 # The number of the current session in the history database 

632 session_number: int = Integer() # type: ignore [assignment] 

633 

634 db_log_output = Bool( 

635 False, help="Should the history database include output? (default: no)" 

636 ).tag(config=True) 

637 db_cache_size = Integer( 

638 0, 

639 help="Write to database every x commands (higher values save disk access & power).\n" 

640 "Values of 1 or less effectively disable caching.", 

641 ).tag(config=True) 

642 # The input and output caches 

643 db_input_cache: List[tuple[int, str, str]] = List() 

644 db_output_cache: List[tuple[int, str]] = List() 

645 

646 # History saving in separate thread 

647 save_thread = Instance("IPython.core.history.HistorySavingThread", allow_none=True) 

648 

649 @property 

650 def save_flag(self) -> threading.Event | None: 

651 if self.save_thread is not None: 

652 return self.save_thread.save_flag 

653 return None 

654 

655 # Private interface 

656 # Variables used to store the three last inputs from the user. On each new 

657 # history update, we populate the user's namespace with these, shifted as 

658 # necessary. 

659 _i00 = Unicode("") 

660 _i = Unicode("") 

661 _ii = Unicode("") 

662 _iii = Unicode("") 

663 

664 # A regex matching all forms of the exit command, so that we don't store 

665 # them in the history (it's annoying to rewind the first entry and land on 

666 # an exit call). 

667 _exit_re = re.compile(r"(exit|quit)(\s*\(.*\))?$") 

668 

669 _instances: WeakSet[HistoryManager] = WeakSet() 

670 _max_inst: int | float = float("inf") 

671 

672 def __init__( 

673 self, 

674 shell: InteractiveShell, 

675 config: Optional[Configuration] = None, 

676 **traits: typing.Any, 

677 ): 

678 """Create a new history manager associated with a shell instance.""" 

679 super().__init__(shell=shell, config=config, **traits) 

680 self.db_input_cache_lock = threading.Lock() 

681 self.db_output_cache_lock = threading.Lock() 

682 

683 try: 

684 self.new_session() 

685 except OperationalError: 

686 self.log.error( 

687 "Failed to create history session in %s. History will not be saved.", 

688 self.hist_file, 

689 exc_info=True, 

690 ) 

691 self.hist_file = ":memory:" 

692 

693 self.using_thread = False 

694 if self.enabled and self.hist_file != ":memory:": 

695 self.save_thread = HistorySavingThread(self) 

696 try: 

697 self.save_thread.start() 

698 except RuntimeError: 

699 self.log.error( 

700 "Failed to start history saving thread. History will not be saved.", 

701 exc_info=True, 

702 ) 

703 self.hist_file = ":memory:" 

704 else: 

705 self.using_thread = True 

706 self._instances.add(self) 

707 assert len(HistoryManager._instances) <= HistoryManager._max_inst, ( 

708 len(HistoryManager._instances), 

709 HistoryManager._max_inst, 

710 ) 

711 

712 def __del__(self) -> None: 

713 if self.save_thread is not None: 

714 self.save_thread.stop() 

715 

716 @classmethod 

717 def _stop_thread(cls) -> None: 

718 # Used before forking so the thread isn't running at fork 

719 for inst in cls._instances: 

720 if inst.save_thread is not None: 

721 inst.save_thread.stop() 

722 inst.save_thread = None 

723 

724 def _restart_thread_if_stopped(self) -> None: 

725 # Start the thread again after it was stopped for forking 

726 if self.save_thread is None and self.using_thread: 

727 self.save_thread = HistorySavingThread(self) 

728 self.save_thread.start() 

729 

730 def _get_hist_file_name(self, profile: Optional[str] = None) -> Path: 

731 """Get default history file name based on the Shell's profile. 

732 

733 The profile parameter is ignored, but must exist for compatibility with 

734 the parent class.""" 

735 profile_dir = self.shell.profile_dir.location 

736 return Path(profile_dir) / "history.sqlite" 

737 

738 @only_when_enabled 

739 def new_session(self, conn: Optional[sqlite3.Connection] = None) -> None: 

740 """Get a new session number.""" 

741 if conn is None: 

742 conn = self.db 

743 

744 with conn: 

745 cur = conn.execute( 

746 """INSERT INTO sessions VALUES (NULL, ?, NULL, 

747 NULL, '') """, 

748 (datetime.datetime.now().isoformat(" "),), 

749 ) 

750 assert isinstance(cur.lastrowid, int) 

751 self.session_number = cur.lastrowid 

752 

753 def end_session(self) -> None: 

754 """Close the database session, filling in the end time and line count.""" 

755 self.writeout_cache() 

756 with self.db: 

757 self.db.execute( 

758 """UPDATE sessions SET end=?, num_cmds=? WHERE 

759 session==?""", 

760 ( 

761 datetime.datetime.now(datetime.timezone.utc).isoformat(" "), 

762 len(self.input_hist_parsed) - 1, 

763 self.session_number, 

764 ), 

765 ) 

766 self.session_number = 0 

767 

768 def name_session(self, name: str) -> None: 

769 """Give the current session a name in the history database.""" 

770 warn( 

771 "name_session is deprecated in IPython 9.0 and will be removed in future versions", 

772 DeprecationWarning, 

773 stacklevel=2, 

774 ) 

775 with self.db: 

776 self.db.execute( 

777 "UPDATE sessions SET remark=? WHERE session==?", 

778 (name, self.session_number), 

779 ) 

780 

781 def reset(self, new_session: bool = True) -> None: 

782 """Clear the session history, releasing all object references, and 

783 optionally open a new session.""" 

784 self.output_hist.clear() 

785 self.outputs.clear() 

786 self.exceptions.clear() 

787 

788 # The directory history can't be completely empty 

789 self.dir_hist[:] = [Path.cwd()] 

790 

791 if new_session: 

792 if self.session_number: 

793 self.end_session() 

794 self.input_hist_parsed[:] = [""] 

795 self.input_hist_raw[:] = [""] 

796 self.new_session() 

797 

798 # ------------------------------ 

799 # Methods for retrieving history 

800 # ------------------------------ 

801 def get_session_info( 

802 self, session: int = 0 

803 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]: 

804 """Get info about a session. 

805 

806 Parameters 

807 ---------- 

808 session : int 

809 Session number to retrieve. The current session is 0, and negative 

810 numbers count back from current session, so -1 is the previous session. 

811 

812 Returns 

813 ------- 

814 session_id : int 

815 Session ID number 

816 start : datetime 

817 Timestamp for the start of the session. 

818 end : datetime 

819 Timestamp for the end of the session, or None if IPython crashed. 

820 num_cmds : int 

821 Number of commands run, or None if IPython crashed. 

822 remark : str 

823 A manually set description. 

824 """ 

825 if session <= 0: 

826 session += self.session_number 

827 

828 return super(HistoryManager, self).get_session_info(session=session) 

829 

830 @catch_corrupt_db 

831 def get_tail( 

832 self, 

833 n: int = 10, 

834 raw: bool = True, 

835 output: bool = False, 

836 include_latest: bool = False, 

837 ) -> Iterable[tuple[int, int, InOrInOut]]: 

838 """Get the last n lines from the history database. 

839 

840 Most recent entry last. 

841 

842 Completion will be reordered so that that the last ones are when 

843 possible from current session. 

844 

845 Parameters 

846 ---------- 

847 n : int 

848 The number of lines to get 

849 raw, output : bool 

850 See :meth:`get_range` 

851 include_latest : bool 

852 If False (default), n+1 lines are fetched, and the latest one 

853 is discarded. This is intended to be used where the function 

854 is called by a user command, which it should not return. 

855 

856 Returns 

857 ------- 

858 Tuples as :meth:`get_range` 

859 """ 

860 self.writeout_cache() 

861 if not include_latest: 

862 n += 1 

863 # cursor/line/entry 

864 this_cur = list( 

865 self._run_sql( 

866 "WHERE session == ? ORDER BY line DESC LIMIT ? ", 

867 (self.session_number, n), 

868 raw=raw, 

869 output=output, 

870 ) 

871 ) 

872 other_cur = list( 

873 self._run_sql( 

874 "WHERE session != ? ORDER BY session DESC, line DESC LIMIT ?", 

875 (self.session_number, n), 

876 raw=raw, 

877 output=output, 

878 ) 

879 ) 

880 

881 everything: list[tuple[int, int, InOrInOut]] = this_cur + other_cur 

882 

883 everything = everything[:n] 

884 

885 if not include_latest: 

886 return list(everything)[:0:-1] 

887 return list(everything)[::-1] 

888 

889 def _get_range_session( 

890 self, 

891 start: int = 1, 

892 stop: Optional[int] = None, 

893 raw: bool = True, 

894 output: bool = False, 

895 ) -> Iterable[tuple[int, int, InOrInOut]]: 

896 """Get input and output history from the current session. Called by 

897 get_range, and takes similar parameters.""" 

898 input_hist = self.input_hist_raw if raw else self.input_hist_parsed 

899 

900 n = len(input_hist) 

901 if start < 0: 

902 start += n 

903 if not stop or (stop > n): 

904 stop = n 

905 elif stop < 0: 

906 stop += n 

907 line: InOrInOut 

908 for i in range(start, stop): 

909 if output: 

910 line = (input_hist[i], self.output_hist_reprs.get(i)) 

911 else: 

912 line = input_hist[i] 

913 yield (0, i, line) 

914 

915 def get_range( 

916 self, 

917 session: int = 0, 

918 start: int = 1, 

919 stop: Optional[int] = None, 

920 raw: bool = True, 

921 output: bool = False, 

922 ) -> Iterable[tuple[int, int, InOrInOut]]: 

923 """Retrieve input by session. 

924 

925 Parameters 

926 ---------- 

927 session : int 

928 Session number to retrieve. The current session is 0, and negative 

929 numbers count back from current session, so -1 is previous session. 

930 start : int 

931 First line to retrieve. 

932 stop : int 

933 End of line range (excluded from output itself). If None, retrieve 

934 to the end of the session. 

935 raw : bool 

936 If True, return untranslated input 

937 output : bool 

938 If True, attempt to include output. This will be 'real' Python 

939 objects for the current session, or text reprs from previous 

940 sessions if db_log_output was enabled at the time. Where no output 

941 is found, None is used. 

942 

943 Returns 

944 ------- 

945 entries 

946 An iterator over the desired lines. Each line is a 3-tuple, either 

947 (session, line, input) if output is False, or 

948 (session, line, (input, output)) if output is True. 

949 """ 

950 if session <= 0: 

951 session += self.session_number 

952 if session == self.session_number: # Current session 

953 return self._get_range_session(start, stop, raw, output) 

954 return super(HistoryManager, self).get_range(session, start, stop, raw, output) 

955 

956 ## ---------------------------- 

957 ## Methods for storing history: 

958 ## ---------------------------- 

959 def store_inputs( 

960 self, line_num: int, source: str, source_raw: Optional[str] = None 

961 ) -> None: 

962 """Store source and raw input in history and create input cache 

963 variables ``_i*``. 

964 

965 Parameters 

966 ---------- 

967 line_num : int 

968 The prompt number of this input. 

969 source : str 

970 Python input. 

971 source_raw : str, optional 

972 If given, this is the raw input without any IPython transformations 

973 applied to it. If not given, ``source`` is used. 

974 """ 

975 if source_raw is None: 

976 source_raw = source 

977 source = source.rstrip("\n") 

978 source_raw = source_raw.rstrip("\n") 

979 

980 # do not store exit/quit commands 

981 if self._exit_re.match(source_raw.strip()): 

982 return 

983 

984 self.input_hist_parsed.append(source) 

985 self.input_hist_raw.append(source_raw) 

986 

987 with self.db_input_cache_lock: 

988 self.db_input_cache.append((line_num, source, source_raw)) 

989 # Trigger to flush cache and write to DB. 

990 if len(self.db_input_cache) >= self.db_cache_size: 

991 if self.using_thread: 

992 self._restart_thread_if_stopped() 

993 if self.save_flag is not None: 

994 self.save_flag.set() 

995 

996 # update the auto _i variables 

997 self._iii = self._ii 

998 self._ii = self._i 

999 self._i = self._i00 

1000 self._i00 = source_raw 

1001 

1002 # hackish access to user namespace to create _i1,_i2... dynamically 

1003 new_i = "_i%s" % line_num 

1004 to_main = {"_i": self._i, "_ii": self._ii, "_iii": self._iii, new_i: self._i00} 

1005 

1006 if self.shell is not None: 

1007 self.shell.push(to_main, interactive=False) 

1008 

1009 def store_output(self, line_num: int) -> None: 

1010 """If database output logging is enabled, this saves all the 

1011 outputs from the indicated prompt number to the database. It's 

1012 called by run_cell after code has been executed. 

1013 

1014 Parameters 

1015 ---------- 

1016 line_num : int 

1017 The line number from which to save outputs 

1018 """ 

1019 if (not self.db_log_output) or (line_num not in self.output_hist_reprs): 

1020 return 

1021 lnum: int = line_num 

1022 output = self.output_hist_reprs[line_num] 

1023 

1024 with self.db_output_cache_lock: 

1025 self.db_output_cache.append((line_num, output)) 

1026 if self.db_cache_size <= 1 and self.using_thread: 

1027 self._restart_thread_if_stopped() 

1028 if self.save_flag is not None: 

1029 self.save_flag.set() 

1030 

1031 def _writeout_input_cache(self, conn: sqlite3.Connection) -> None: 

1032 with conn: 

1033 for line in self.db_input_cache: 

1034 conn.execute( 

1035 "INSERT INTO history VALUES (?, ?, ?, ?)", 

1036 (self.session_number,) + line, 

1037 ) 

1038 

1039 def _writeout_output_cache(self, conn: sqlite3.Connection) -> None: 

1040 with conn: 

1041 for line in self.db_output_cache: 

1042 conn.execute( 

1043 "INSERT INTO output_history VALUES (?, ?, ?)", 

1044 (self.session_number,) + line, 

1045 ) 

1046 

1047 @only_when_enabled 

1048 def writeout_cache(self, conn: Optional[sqlite3.Connection] = None) -> None: 

1049 """Write any entries in the cache to the database.""" 

1050 if conn is None: 

1051 conn = self.db 

1052 

1053 with self.db_input_cache_lock: 

1054 try: 

1055 self._writeout_input_cache(conn) 

1056 except sqlite3.IntegrityError: 

1057 self.new_session(conn) 

1058 print( 

1059 "ERROR! Session/line number was not unique in", 

1060 "database. History logging moved to new session", 

1061 self.session_number, 

1062 ) 

1063 try: 

1064 # Try writing to the new session. If this fails, don't 

1065 # recurse 

1066 self._writeout_input_cache(conn) 

1067 except sqlite3.IntegrityError: 

1068 pass 

1069 finally: 

1070 self.db_input_cache = [] 

1071 

1072 with self.db_output_cache_lock: 

1073 try: 

1074 self._writeout_output_cache(conn) 

1075 except sqlite3.IntegrityError: 

1076 print( 

1077 "!! Session/line number for output was not unique", 

1078 "in database. Output will not be stored.", 

1079 ) 

1080 finally: 

1081 self.db_output_cache = [] 

1082 

1083 

1084if hasattr(os, "register_at_fork"): 

1085 os.register_at_fork(before=HistoryManager._stop_thread) 

1086 

1087 

1088from collections.abc import Callable, Iterator 

1089from weakref import ReferenceType 

1090 

1091 

1092@contextmanager 

1093def hold(ref: ReferenceType[HistoryManager]) -> Iterator[ReferenceType[HistoryManager]]: 

1094 """ 

1095 Context manger that hold a reference to a weak ref to make sure it 

1096 is not GC'd during it's context. 

1097 """ 

1098 r = ref() 

1099 yield ref 

1100 del r 

1101 

1102 

1103class HistorySavingThread(threading.Thread): 

1104 """This thread takes care of writing history to the database, so that 

1105 the UI isn't held up while that happens. 

1106 

1107 It waits for the HistoryManager's save_flag to be set, then writes out 

1108 the history cache. The main thread is responsible for setting the flag when 

1109 the cache size reaches a defined threshold.""" 

1110 

1111 save_flag: threading.Event 

1112 daemon: bool = True 

1113 _stop_now: bool = False 

1114 enabled: bool = True 

1115 history_manager: ref[HistoryManager] 

1116 _stopped = False 

1117 

1118 def __init__(self, history_manager: HistoryManager) -> None: 

1119 super(HistorySavingThread, self).__init__(name="IPythonHistorySavingThread") 

1120 self.history_manager = ref(history_manager) 

1121 self.enabled = history_manager.enabled 

1122 self.save_flag = threading.Event() 

1123 

1124 @only_when_enabled 

1125 def run(self) -> None: 

1126 atexit.register(self.stop) 

1127 # We need a separate db connection per thread: 

1128 try: 

1129 hm: ReferenceType[HistoryManager] 

1130 with hold(self.history_manager) as hm: 

1131 if hm() is not None: 

1132 self.db = sqlite3.connect( 

1133 str(hm().hist_file), # type: ignore [union-attr] 

1134 **hm().connection_options, # type: ignore [union-attr] 

1135 ) 

1136 while True: 

1137 self.save_flag.wait() 

1138 with hold(self.history_manager) as hm: 

1139 if hm() is None: 

1140 self._stop_now = True 

1141 if self._stop_now: 

1142 self.db.close() 

1143 return 

1144 self.save_flag.clear() 

1145 if hm() is not None: 

1146 hm().writeout_cache(self.db) # type: ignore [union-attr] 

1147 

1148 except Exception as e: 

1149 print( 

1150 ( 

1151 "The history saving thread hit an unexpected error (%s)." 

1152 "History will not be written to the database." 

1153 ) 

1154 % repr(e) 

1155 ) 

1156 finally: 

1157 atexit.unregister(self.stop) 

1158 

1159 def stop(self) -> None: 

1160 """This can be called from the main thread to safely stop this thread. 

1161 

1162 Note that it does not attempt to write out remaining history before 

1163 exiting. That should be done by calling the HistoryManager's 

1164 end_session method.""" 

1165 if self._stopped: 

1166 return 

1167 self._stop_now = True 

1168 

1169 self.save_flag.set() 

1170 self._stopped = True 

1171 if self != threading.current_thread(): 

1172 self.join() 

1173 

1174 def __del__(self) -> None: 

1175 self.stop() 

1176 

1177 

1178# To match, e.g. ~5/8-~2/3, or ~4 (without trailing slash for full session) 

1179# Session numbers: ~N or N/ 

1180# Line numbers: N (just digits, no ~) 

1181# Range syntax: 4-6 (with end) or 4- (without end, means "onward") 

1182range_re = re.compile( 

1183 r""" 

1184((?P<startsess>(?:~?\d+/)))? 

1185(?P<start>\d+)? 

1186((?P<sep>[\-:]) 

1187 ((?P<endsess>(?:~?\d+/)))? 

1188 (?P<end>\d*))? 

1189$""", 

1190 re.VERBOSE, 

1191) 

1192 

1193 

1194def extract_hist_ranges(ranges_str: str) -> Iterable[tuple[int, int, Optional[int]]]: 

1195 """Turn a string of history ranges into 3-tuples of (session, start, stop). 

1196 

1197 Empty string results in a `[(0, 1, None)]`, i.e. "everything from current 

1198 session". 

1199 

1200 Examples 

1201 -------- 

1202 >>> list(extract_hist_ranges("~8/5-~7/4 2")) 

1203 [(-8, 5, None), (-7, 1, 5), (0, 2, 3)] 

1204 >>> list(extract_hist_ranges("~4/")) 

1205 [(-4, 1, None)] 

1206 >>> list(extract_hist_ranges("4-")) 

1207 [(0, 4, None)] 

1208 >>> list(extract_hist_ranges("~4/4-")) 

1209 [(-4, 4, None)] 

1210 """ 

1211 if ranges_str == "": 

1212 yield (0, 1, None) # Everything from current session 

1213 return 

1214 

1215 for range_str in ranges_str.split(): 

1216 rmatch = range_re.match(range_str) 

1217 if not rmatch: 

1218 continue 

1219 start = rmatch.group("start") 

1220 sep = rmatch.group("sep") 

1221 if start: 

1222 start = int(start) 

1223 end = rmatch.group("end") 

1224 if sep == "-": 

1225 end = (int(end) + 1) if end else None 

1226 else: 

1227 end = int(end) if end else start + 1 

1228 else: 

1229 if not rmatch.group("startsess"): 

1230 continue 

1231 start = 1 

1232 end = None 

1233 startsess = rmatch.group("startsess") or "0" 

1234 endsess = rmatch.group("endsess") or startsess 

1235 startsess = startsess.rstrip("/") 

1236 endsess = endsess.rstrip("/") 

1237 startsess = int(startsess.replace("~", "-")) 

1238 endsess = int(endsess.replace("~", "-")) 

1239 assert endsess >= startsess, "start session must be earlier than end session" 

1240 

1241 if endsess == startsess: 

1242 yield (startsess, start, end) 

1243 continue 

1244 # Multiple sessions in one range: 

1245 yield (startsess, start, None) 

1246 for sess in range(startsess + 1, endsess): 

1247 yield (sess, 1, None) 

1248 yield (endsess, 1, end) 

1249 

1250 

1251def _format_lineno(session: int, line: int) -> str: 

1252 """Helper function to format line numbers properly.""" 

1253 if session == 0: 

1254 return str(line) 

1255 return "%s#%s" % (session, line)