Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/history.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

505 statements  

1"""History related magics and functionality""" 

2 

3from __future__ import annotations 

4 

5# Copyright (c) IPython Development Team. 

6# Distributed under the terms of the Modified BSD License. 

7 

8 

9import atexit 

10import datetime 

11import os 

12import re 

13import weakref 

14 

15 

16import threading 

17from pathlib import Path 

18 

19from collections import defaultdict 

20from contextlib import contextmanager 

21from dataclasses import dataclass 

22from decorator import decorator 

23from traitlets import ( 

24 Any, 

25 Bool, 

26 Dict, 

27 Instance, 

28 Integer, 

29 List, 

30 TraitError, 

31 Unicode, 

32 Union, 

33 default, 

34 observe, 

35) 

36from traitlets.config.configurable import LoggingConfigurable 

37 

38from IPython.paths import locate_profile 

39from IPython.utils.decorators import undoc 

40from typing import Tuple, Optional, TYPE_CHECKING 

41from collections.abc import Iterable 

42import typing 

43import typing as t 

44from typing import cast 

45from warnings import warn 

46from weakref import ref, WeakSet 

47 

48if TYPE_CHECKING: 

49 from IPython.core.interactiveshell import InteractiveShell 

50 from traitlets.config import Config as Configuration 

51 

52try: 

53 from sqlite3 import DatabaseError, OperationalError 

54 import sqlite3 

55 

56 sqlite3.register_converter( 

57 "timestamp", lambda val: datetime.datetime.fromisoformat(val.decode()) 

58 ) 

59 

60 sqlite3_found = True 

61except ModuleNotFoundError: 

62 sqlite3_found = False 

63 

64 class DatabaseError(Exception): # type: ignore [no-redef] 

65 pass 

66 

67 class OperationalError(Exception): # type: ignore [no-redef] 

68 pass 

69 

70 

71InOrInOut = typing.Union[str, tuple[str, Optional[str]]] 

72 

73# ----------------------------------------------------------------------------- 

74# Classes and functions 

75# ----------------------------------------------------------------------------- 

76 

77 

78@undoc 

79class DummyDB: 

80 """Dummy DB that will act as a black hole for history. 

81 

82 Only used in the absence of sqlite""" 

83 

84 def execute(*args: typing.Any, **kwargs: typing.Any) -> list: 

85 return [] 

86 

87 def commit(self, *args, **kwargs): # type: ignore [no-untyped-def] 

88 pass 

89 

90 def __enter__(self, *args, **kwargs): # type: ignore [no-untyped-def] 

91 pass 

92 

93 def __exit__(self, *args, **kwargs): # type: ignore [no-untyped-def] 

94 pass 

95 

96 def close(self, *args, **kwargs): # type: ignore [no-untyped-def] 

97 pass 

98 

99 

100@decorator 

101def only_when_enabled(f, self, *a, **kw): # type: ignore [no-untyped-def] 

102 """Decorator: return an empty list in the absence of sqlite.""" 

103 if not self.enabled: 

104 return [] 

105 else: 

106 return f(self, *a, **kw) 

107 

108 

109# use 16kB as threshold for whether a corrupt history db should be saved 

110# that should be at least 100 entries or so 

111_SAVE_DB_SIZE = 16384 

112 

113 

114@decorator 

115def catch_corrupt_db(f, self, *a, **kw): # type: ignore [no-untyped-def] 

116 """A decorator which wraps HistoryAccessor method calls to catch errors from 

117 a corrupt SQLite database, move the old database out of the way, and create 

118 a new one. 

119 

120 We avoid clobbering larger databases because this may be triggered due to filesystem issues, 

121 not just a corrupt file. 

122 """ 

123 try: 

124 return f(self, *a, **kw) 

125 except (DatabaseError, OperationalError) as e: 

126 self._corrupt_db_counter += 1 

127 self.log.error("Failed to open SQLite history %s (%s).", self.hist_file, e) 

128 if self.hist_file != ":memory:": 

129 if self._corrupt_db_counter > self._corrupt_db_limit: 

130 self.hist_file = ":memory:" 

131 self.log.error( 

132 "Failed to load history too many times, history will not be saved." 

133 ) 

134 elif self.hist_file.is_file(): 

135 # move the file out of the way 

136 base = str(self.hist_file.parent / self.hist_file.stem) 

137 ext = self.hist_file.suffix 

138 size = self.hist_file.stat().st_size 

139 if size >= _SAVE_DB_SIZE: 

140 # if there's significant content, avoid clobbering 

141 now = ( 

142 datetime.datetime.now(datetime.timezone.utc) 

143 .isoformat() 

144 .replace(":", ".") 

145 ) 

146 newpath = base + "-corrupt-" + now + ext 

147 # don't clobber previous corrupt backups 

148 for i in range(100): 

149 if not Path(newpath).exists(): 

150 break 

151 else: 

152 newpath = base + "-corrupt-" + now + ("-%i" % i) + ext 

153 else: 

154 # not much content, possibly empty; don't worry about clobbering 

155 # maybe we should just delete it? 

156 newpath = base + "-corrupt" + ext 

157 self.hist_file.rename(newpath) 

158 self.log.error( 

159 "History file was moved to %s and a new file created.", newpath 

160 ) 

161 self.init_db() 

162 return [] 

163 else: 

164 # Failed with :memory:, something serious is wrong 

165 raise 

166 

167 

168class HistoryAccessorBase(LoggingConfigurable): 

169 """An abstract class for History Accessors""" 

170 

171 def get_tail( 

172 self, 

173 n: int = 10, 

174 raw: bool = True, 

175 output: bool = False, 

176 include_latest: bool = False, 

177 ) -> Iterable[tuple[int, int, InOrInOut]]: 

178 raise NotImplementedError 

179 

180 def search( 

181 self, 

182 pattern: str = "*", 

183 raw: bool = True, 

184 search_raw: bool = True, 

185 output: bool = False, 

186 n: Optional[int] = None, 

187 unique: bool = False, 

188 ) -> Iterable[tuple[int, int, InOrInOut]]: 

189 raise NotImplementedError 

190 

191 def get_range( 

192 self, 

193 session: int, 

194 start: int = 1, 

195 stop: Optional[int] = None, 

196 raw: bool = True, 

197 output: bool = False, 

198 ) -> Iterable[tuple[int, int, InOrInOut]]: 

199 raise NotImplementedError 

200 

201 def get_range_by_str( 

202 self, rangestr: str, raw: bool = True, output: bool = False 

203 ) -> Iterable[tuple[int, int, InOrInOut]]: 

204 raise NotImplementedError 

205 

206 

207class HistoryAccessor(HistoryAccessorBase): 

208 """Access the history database without adding to it. 

209 

210 This is intended for use by standalone history tools. IPython shells use 

211 HistoryManager, below, which is a subclass of this.""" 

212 

213 # counter for init_db retries, so we don't keep trying over and over 

214 _corrupt_db_counter = 0 

215 # after two failures, fallback on :memory: 

216 _corrupt_db_limit = 2 

217 

218 # String holding the path to the history file 

219 hist_file = Union( 

220 [Instance(Path), Unicode()], 

221 help="""Path to file to use for SQLite history database. 

222 

223 By default, IPython will put the history database in the IPython 

224 profile directory. If you would rather share one history among 

225 profiles, you can set this value in each, so that they are consistent. 

226 

227 Due to an issue with fcntl, SQLite is known to misbehave on some NFS 

228 mounts. If you see IPython hanging, try setting this to something on a 

229 local disk, e.g:: 

230 

231 ipython --HistoryManager.hist_file=/tmp/ipython_hist.sqlite 

232 

233 you can also use the specific value `:memory:` (including the colon 

234 at both end but not the back ticks), to avoid creating an history file. 

235 

236 """, 

237 ).tag(config=True) 

238 

239 enabled = Bool( 

240 sqlite3_found, 

241 help="""enable the SQLite history 

242 

243 set enabled=False to disable the SQLite history, 

244 in which case there will be no stored history, no SQLite connection, 

245 and no background saving thread. This may be necessary in some 

246 threaded environments where IPython is embedded. 

247 """, 

248 ).tag(config=True) 

249 

250 connection_options = Dict( 

251 help="""Options for configuring the SQLite connection 

252 

253 These options are passed as keyword args to sqlite3.connect 

254 when establishing database connections. 

255 """ 

256 ).tag(config=True) 

257 

258 @default("connection_options") 

259 def _default_connection_options(self) -> dict[str, bool]: 

260 return dict(check_same_thread=False) 

261 

262 # The SQLite database 

263 db = Any() 

264 

265 @observe("db") 

266 @only_when_enabled 

267 def _db_changed(self, change): # type: ignore [no-untyped-def] 

268 """validate the db, since it can be an Instance of two different types""" 

269 new = change["new"] 

270 connection_types = (DummyDB, sqlite3.Connection) 

271 if not isinstance(new, connection_types): 

272 msg = "%s.db must be sqlite3 Connection or DummyDB, not %r" % ( 

273 self.__class__.__name__, 

274 new, 

275 ) 

276 raise TraitError(msg) 

277 

278 def __init__( 

279 self, profile: str = "default", hist_file: str = "", **traits: typing.Any 

280 ) -> None: 

281 """Create a new history accessor. 

282 

283 Parameters 

284 ---------- 

285 profile : str 

286 The name of the profile from which to open history. 

287 hist_file : str 

288 Path to an SQLite history database stored by IPython. If specified, 

289 hist_file overrides profile. 

290 config : :class:`~traitlets.config.loader.Config` 

291 Config object. hist_file can also be set through this. 

292 """ 

293 super(HistoryAccessor, self).__init__(**traits) 

294 # defer setting hist_file from kwarg until after init, 

295 # otherwise the default kwarg value would clobber any value 

296 # set by config 

297 if hist_file: 

298 self.hist_file = hist_file 

299 

300 try: 

301 self.hist_file 

302 except TraitError: 

303 # No one has set the hist_file, yet. 

304 self.hist_file = self._get_hist_file_name(profile) 

305 

306 self.init_db() 

307 

308 def _get_hist_file_name(self, profile: str = "default") -> Path: 

309 """Find the history file for the given profile name. 

310 

311 This is overridden by the HistoryManager subclass, to use the shell's 

312 active profile. 

313 

314 Parameters 

315 ---------- 

316 profile : str 

317 The name of a profile which has a history file. 

318 """ 

319 return Path(locate_profile(profile)) / "history.sqlite" 

320 

321 @catch_corrupt_db 

322 def init_db(self) -> None: 

323 """Connect to the database, and create tables if necessary.""" 

324 if not self.enabled: 

325 self.db = DummyDB() 

326 self._finalizer = weakref.finalize(self, lambda db: db.close(), self.db) 

327 return 

328 

329 # use detect_types so that timestamps return datetime objects 

330 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) 

331 kwargs.update(self.connection_options) 

332 self.db = sqlite3.connect(str(self.hist_file), **kwargs) # type: ignore [call-overload] 

333 self._finalizer = weakref.finalize(self, lambda db: db.close(), self.db) 

334 with self.db: 

335 self.db.execute( 

336 """CREATE TABLE IF NOT EXISTS sessions (session integer 

337 primary key autoincrement, start timestamp, 

338 end timestamp, num_cmds integer, remark text)""" 

339 ) 

340 self.db.execute( 

341 """CREATE TABLE IF NOT EXISTS history 

342 (session integer, line integer, source text, source_raw text, 

343 PRIMARY KEY (session, line))""" 

344 ) 

345 # Output history is optional, but ensure the table's there so it can be 

346 # enabled later. 

347 self.db.execute( 

348 """CREATE TABLE IF NOT EXISTS output_history 

349 (session integer, line integer, output text, 

350 PRIMARY KEY (session, line))""" 

351 ) 

352 # success! reset corrupt db count 

353 self._corrupt_db_counter = 0 

354 

355 def writeout_cache(self) -> None: 

356 """Overridden by HistoryManager to dump the cache before certain 

357 database lookups.""" 

358 pass 

359 

360 ## ------------------------------- 

361 ## Methods for retrieving history: 

362 ## ------------------------------- 

363 def _run_sql( 

364 self, 

365 sql: str, 

366 params: tuple, 

367 raw: bool = True, 

368 output: bool = False, 

369 latest: bool = False, 

370 ) -> Iterable[tuple[int, int, InOrInOut]]: 

371 """Prepares and runs an SQL query for the history database. 

372 

373 Parameters 

374 ---------- 

375 sql : str 

376 Any filtering expressions to go after SELECT ... FROM ... 

377 params : tuple 

378 Parameters passed to the SQL query (to replace "?") 

379 raw, output : bool 

380 See :meth:`get_range` 

381 latest : bool 

382 Select rows with max (session, line) 

383 

384 Returns 

385 ------- 

386 Tuples as :meth:`get_range` 

387 """ 

388 toget = "source_raw" if raw else "source" 

389 sqlfrom = "history" 

390 if output: 

391 sqlfrom = "history LEFT JOIN output_history USING (session, line)" 

392 toget = "history.%s, output_history.output" % toget 

393 if latest: 

394 toget += ", MAX(session * 128 * 1024 + line)" 

395 this_querry = "SELECT session, line, %s FROM %s " % (toget, sqlfrom) + sql 

396 cur = self.db.execute(this_querry, params) 

397 if latest: 

398 cur = (row[:-1] for row in cur) 

399 if output: # Regroup into 3-tuples, and parse JSON 

400 return ((ses, lin, (inp, out)) for ses, lin, inp, out in cur) 

401 return cur 

402 

403 @only_when_enabled 

404 @catch_corrupt_db 

405 def get_session_info( 

406 self, session: int 

407 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]: 

408 """Get info about a session. 

409 

410 Parameters 

411 ---------- 

412 session : int 

413 Session number to retrieve. 

414 

415 Returns 

416 ------- 

417 session_id : int 

418 Session ID number 

419 start : datetime 

420 Timestamp for the start of the session. 

421 end : datetime 

422 Timestamp for the end of the session, or None if IPython crashed. 

423 num_cmds : int 

424 Number of commands run, or None if IPython crashed. 

425 remark : str 

426 A manually set description. 

427 """ 

428 query = "SELECT * from sessions where session == ?" 

429 return self.db.execute(query, (session,)).fetchone() 

430 

431 @catch_corrupt_db 

432 def get_last_session_id(self) -> Optional[int]: 

433 """Get the last session ID currently in the database. 

434 

435 Within IPython, this should be the same as the value stored in 

436 :attr:`HistoryManager.session_number`. 

437 """ 

438 for record in self.get_tail(n=1, include_latest=True): 

439 return record[0] 

440 return None 

441 

442 @catch_corrupt_db 

443 def get_tail( 

444 self, 

445 n: int = 10, 

446 raw: bool = True, 

447 output: bool = False, 

448 include_latest: bool = False, 

449 ) -> Iterable[tuple[int, int, InOrInOut]]: 

450 """Get the last n lines from the history database. 

451 

452 Parameters 

453 ---------- 

454 n : int 

455 The number of lines to get 

456 raw, output : bool 

457 See :meth:`get_range` 

458 include_latest : bool 

459 If False (default), n+1 lines are fetched, and the latest one 

460 is discarded. This is intended to be used where the function 

461 is called by a user command, which it should not return. 

462 

463 Returns 

464 ------- 

465 Tuples as :meth:`get_range` 

466 """ 

467 self.writeout_cache() 

468 if not include_latest: 

469 n += 1 

470 cur = self._run_sql( 

471 "ORDER BY session DESC, line DESC LIMIT ?", (n,), raw=raw, output=output 

472 ) 

473 if not include_latest: 

474 return reversed(list(cur)[1:]) 

475 return reversed(list(cur)) 

476 

477 @catch_corrupt_db 

478 def search( 

479 self, 

480 pattern: str = "*", 

481 raw: bool = True, 

482 search_raw: bool = True, 

483 output: bool = False, 

484 n: Optional[int] = None, 

485 unique: bool = False, 

486 ) -> Iterable[tuple[int, int, InOrInOut]]: 

487 """Search the database using unix glob-style matching (wildcards 

488 * and ?). 

489 

490 Parameters 

491 ---------- 

492 pattern : str 

493 The wildcarded pattern to match when searching 

494 search_raw : bool 

495 If True, search the raw input, otherwise, the parsed input 

496 raw, output : bool 

497 See :meth:`get_range` 

498 n : None or int 

499 If an integer is given, it defines the limit of 

500 returned entries. 

501 unique : bool 

502 When it is true, return only unique entries. 

503 

504 Returns 

505 ------- 

506 Tuples as :meth:`get_range` 

507 """ 

508 tosearch = "source_raw" if search_raw else "source" 

509 if output: 

510 tosearch = "history." + tosearch 

511 self.writeout_cache() 

512 sqlform = "WHERE %s GLOB ?" % tosearch 

513 params: tuple[typing.Any, ...] = (pattern,) 

514 if unique: 

515 sqlform += " GROUP BY {0}".format(tosearch) 

516 if n is not None: 

517 sqlform += " ORDER BY session DESC, line DESC LIMIT ?" 

518 params += (n,) 

519 elif unique: 

520 sqlform += " ORDER BY session, line" 

521 cur = self._run_sql(sqlform, params, raw=raw, output=output, latest=unique) 

522 if n is not None: 

523 return reversed(list(cur)) 

524 return cur 

525 

526 @catch_corrupt_db 

527 def get_range( 

528 self, 

529 session: int, 

530 start: int = 1, 

531 stop: Optional[int] = None, 

532 raw: bool = True, 

533 output: bool = False, 

534 ) -> Iterable[tuple[int, int, InOrInOut]]: 

535 """Retrieve input by session. 

536 

537 Parameters 

538 ---------- 

539 session : int 

540 Session number to retrieve. 

541 start : int 

542 First line to retrieve. 

543 stop : int 

544 End of line range (excluded from output itself). If None, retrieve 

545 to the end of the session. 

546 raw : bool 

547 If True, return untranslated input 

548 output : bool 

549 If True, attempt to include output. This will be 'real' Python 

550 objects for the current session, or text reprs from previous 

551 sessions if db_log_output was enabled at the time. Where no output 

552 is found, None is used. 

553 

554 Returns 

555 ------- 

556 entries 

557 An iterator over the desired lines. Each line is a 3-tuple, either 

558 (session, line, input) if output is False, or 

559 (session, line, (input, output)) if output is True. 

560 """ 

561 params: tuple[typing.Any, ...] 

562 if stop: 

563 lineclause = "line >= ? AND line < ?" 

564 params = (session, start, stop) 

565 else: 

566 lineclause = "line>=?" 

567 params = (session, start) 

568 

569 return self._run_sql( 

570 "WHERE session==? AND %s" % lineclause, params, raw=raw, output=output 

571 ) 

572 

573 def get_range_by_str( 

574 self, rangestr: str, raw: bool = True, output: bool = False 

575 ) -> Iterable[tuple[int, int, InOrInOut]]: 

576 """Get lines of history from a string of ranges, as used by magic 

577 commands %hist, %save, %macro, etc. 

578 

579 Parameters 

580 ---------- 

581 rangestr : str 

582 A string specifying ranges, e.g. "5 ~2/1-4". If empty string is used, 

583 this will return everything from current session's history. 

584 

585 See the documentation of :func:`%history` for the full details. 

586 

587 raw, output : bool 

588 As :meth:`get_range` 

589 

590 Returns 

591 ------- 

592 Tuples as :meth:`get_range` 

593 """ 

594 for sess, s, e in extract_hist_ranges(rangestr): 

595 yield from self.get_range(sess, s, e, raw=raw, output=output) 

596 

597 

598@dataclass 

599class HistoryOutput: 

600 output_type: typing.Literal[ 

601 "out_stream", "err_stream", "display_data", "execute_result" 

602 ] 

603 bundle: typing.Dict[str, str | list[str]] 

604 

605 

606class HistoryManager(HistoryAccessor): 

607 """A class to organize all history-related functionality in one place.""" 

608 

609 # Public interface 

610 

611 # An instance of the IPython shell we are attached to 

612 shell = Instance( 

613 "IPython.core.interactiveshell.InteractiveShellABC", allow_none=False 

614 ) 

615 # Lists to hold processed and raw history. These start with a blank entry 

616 # so that we can index them starting from 1 

617 input_hist_parsed = List([""]) 

618 input_hist_raw = List([""]) 

619 # A list of directories visited during session 

620 dir_hist: List = List() 

621 

622 @default("dir_hist") 

623 def _dir_hist_default(self) -> list[Path]: 

624 try: 

625 return [Path.cwd()] 

626 except OSError: 

627 return [] 

628 

629 # A dict of output history, keyed with ints from the shell's 

630 # execution count. 

631 output_hist = Dict() 

632 # The text/plain repr of outputs. 

633 output_hist_reprs: typing.Dict[int, str] = Dict() # type: ignore [assignment] 

634 # Maps execution_count to MIME bundles 

635 outputs: typing.Dict[int, typing.List[HistoryOutput]] = defaultdict(list) 

636 # Maps execution_count to exception tracebacks 

637 exceptions: typing.Dict[int, typing.Dict[str, Any]] = Dict() # type: ignore [assignment] 

638 

639 # The number of the current session in the history database 

640 session_number: int = Integer() # type: ignore [assignment] 

641 

642 db_log_output = Bool( 

643 False, help="Should the history database include output? (default: no)" 

644 ).tag(config=True) 

645 db_cache_size = Integer( 

646 0, 

647 help="Write to database every x commands (higher values save disk access & power).\n" 

648 "Values of 1 or less effectively disable caching.", 

649 ).tag(config=True) 

650 # The input and output caches 

651 db_input_cache: List[tuple[int, str, str]] = List() 

652 db_output_cache: List[tuple[int, str]] = List() 

653 

654 # History saving in separate thread 

655 save_thread = Instance("IPython.core.history.HistorySavingThread", allow_none=True) 

656 

657 @property 

658 def save_flag(self) -> threading.Event | None: 

659 if self.save_thread is not None: 

660 return self.save_thread.save_flag 

661 return None 

662 

663 # Private interface 

664 # Variables used to store the three last inputs from the user. On each new 

665 # history update, we populate the user's namespace with these, shifted as 

666 # necessary. 

667 _i00 = Unicode("") 

668 _i = Unicode("") 

669 _ii = Unicode("") 

670 _iii = Unicode("") 

671 

672 # A regex matching all forms of the exit command, so that we don't store 

673 # them in the history (it's annoying to rewind the first entry and land on 

674 # an exit call). 

675 _exit_re = re.compile(r"(exit|quit)(\s*\(.*\))?$") 

676 

677 _instances: WeakSet[HistoryManager] = WeakSet() 

678 _max_inst: int | float = float("inf") 

679 

680 def __init__( 

681 self, 

682 shell: InteractiveShell, 

683 config: Optional[Configuration] = None, 

684 **traits: typing.Any, 

685 ): 

686 """Create a new history manager associated with a shell instance.""" 

687 super().__init__(shell=shell, config=config, **traits) 

688 self.db_input_cache_lock = threading.Lock() 

689 self.db_output_cache_lock = threading.Lock() 

690 

691 try: 

692 self.new_session() 

693 except OperationalError: 

694 self.log.error( 

695 "Failed to create history session in %s. History will not be saved.", 

696 self.hist_file, 

697 exc_info=True, 

698 ) 

699 self._switch_to_memory_history() 

700 

701 self.using_thread = False 

702 if self.enabled and self.hist_file != ":memory:": 

703 self.save_thread = HistorySavingThread(self) 

704 try: 

705 self.save_thread.start() 

706 except RuntimeError: 

707 self.log.error( 

708 "Failed to start history saving thread. History will not be saved.", 

709 exc_info=True, 

710 ) 

711 self._switch_to_memory_history() 

712 self.save_thread = None 

713 else: 

714 self.using_thread = True 

715 self._instances.add(self) 

716 assert len(HistoryManager._instances) <= HistoryManager._max_inst, ( 

717 len(HistoryManager._instances), 

718 HistoryManager._max_inst, 

719 ) 

720 

721 def _switch_to_memory_history(self) -> None: 

722 """Switch history storage to an in-memory SQLite database.""" 

723 try: 

724 self.db.close() 

725 except Exception: 

726 pass 

727 self.hist_file = ":memory:" 

728 self.init_db() 

729 self.new_session() 

730 

731 def __del__(self) -> None: 

732 if self.save_thread is not None: 

733 self.save_thread.stop() 

734 

735 @classmethod 

736 def _stop_thread(cls) -> None: 

737 # Used before forking so the thread isn't running at fork 

738 for inst in cls._instances: 

739 if inst.save_thread is not None: 

740 inst.save_thread.stop() 

741 inst.save_thread = None 

742 

743 def _restart_thread_if_stopped(self) -> None: 

744 # Start the thread again after it was stopped for forking 

745 if self.save_thread is None and self.using_thread: 

746 self.save_thread = HistorySavingThread(self) 

747 self.save_thread.start() 

748 

749 def _get_hist_file_name(self, profile: Optional[str] = None) -> Path: 

750 """Get default history file name based on the Shell's profile. 

751 

752 The profile parameter is ignored, but must exist for compatibility with 

753 the parent class.""" 

754 profile_dir = self.shell.profile_dir.location 

755 return Path(profile_dir) / "history.sqlite" 

756 

757 @only_when_enabled 

758 def new_session(self, conn: Optional[sqlite3.Connection] = None) -> None: 

759 """Get a new session number.""" 

760 if conn is None: 

761 conn = self.db 

762 

763 with conn: 

764 cur = conn.execute( 

765 """INSERT INTO sessions VALUES (NULL, ?, NULL, 

766 NULL, '') """, 

767 (datetime.datetime.now().isoformat(" "),), 

768 ) 

769 assert isinstance(cur.lastrowid, int) 

770 self.session_number = cur.lastrowid 

771 

772 def end_session(self) -> None: 

773 """Close the database session, filling in the end time and line count.""" 

774 self.writeout_cache() 

775 with self.db: 

776 self.db.execute( 

777 """UPDATE sessions SET end=?, num_cmds=? WHERE 

778 session==?""", 

779 ( 

780 datetime.datetime.now(datetime.timezone.utc).isoformat(" "), 

781 len(self.input_hist_parsed) - 1, 

782 self.session_number, 

783 ), 

784 ) 

785 self.session_number = 0 

786 

787 def name_session(self, name: str) -> None: 

788 """Give the current session a name in the history database.""" 

789 warn( 

790 "name_session is deprecated in IPython 9.0 and will be removed in future versions", 

791 DeprecationWarning, 

792 stacklevel=2, 

793 ) 

794 with self.db: 

795 self.db.execute( 

796 "UPDATE sessions SET remark=? WHERE session==?", 

797 (name, self.session_number), 

798 ) 

799 

800 def reset(self, new_session: bool = True) -> None: 

801 """Clear the session history, releasing all object references, and 

802 optionally open a new session.""" 

803 self.output_hist.clear() 

804 self.outputs.clear() 

805 self.exceptions.clear() 

806 

807 # The directory history can't be completely empty 

808 self.dir_hist[:] = [Path.cwd()] 

809 

810 if new_session: 

811 if self.session_number: 

812 self.end_session() 

813 self.input_hist_parsed[:] = [""] 

814 self.input_hist_raw[:] = [""] 

815 self.new_session() 

816 

817 # ------------------------------ 

818 # Methods for retrieving history 

819 # ------------------------------ 

820 def get_session_info( 

821 self, session: int = 0 

822 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]: 

823 """Get info about a session. 

824 

825 Parameters 

826 ---------- 

827 session : int 

828 Session number to retrieve. The current session is 0, and negative 

829 numbers count back from current session, so -1 is the previous session. 

830 

831 Returns 

832 ------- 

833 session_id : int 

834 Session ID number 

835 start : datetime 

836 Timestamp for the start of the session. 

837 end : datetime 

838 Timestamp for the end of the session, or None if IPython crashed. 

839 num_cmds : int 

840 Number of commands run, or None if IPython crashed. 

841 remark : str 

842 A manually set description. 

843 """ 

844 if session <= 0: 

845 session += self.session_number 

846 

847 return super(HistoryManager, self).get_session_info(session=session) 

848 

849 @catch_corrupt_db 

850 def get_tail( 

851 self, 

852 n: int = 10, 

853 raw: bool = True, 

854 output: bool = False, 

855 include_latest: bool = False, 

856 ) -> Iterable[tuple[int, int, InOrInOut]]: 

857 """Get the last n lines from the history database. 

858 

859 Most recent entry last. 

860 

861 Completion will be reordered so that that the last ones are when 

862 possible from current session. 

863 

864 Parameters 

865 ---------- 

866 n : int 

867 The number of lines to get 

868 raw, output : bool 

869 See :meth:`get_range` 

870 include_latest : bool 

871 If False (default), n+1 lines are fetched, and the latest one 

872 is discarded. This is intended to be used where the function 

873 is called by a user command, which it should not return. 

874 

875 Returns 

876 ------- 

877 Tuples as :meth:`get_range` 

878 """ 

879 self.writeout_cache() 

880 if not include_latest: 

881 n += 1 

882 # cursor/line/entry 

883 this_cur = list( 

884 self._run_sql( 

885 "WHERE session == ? ORDER BY line DESC LIMIT ? ", 

886 (self.session_number, n), 

887 raw=raw, 

888 output=output, 

889 ) 

890 ) 

891 other_cur = list( 

892 self._run_sql( 

893 "WHERE session != ? ORDER BY session DESC, line DESC LIMIT ?", 

894 (self.session_number, n), 

895 raw=raw, 

896 output=output, 

897 ) 

898 ) 

899 

900 everything: list[tuple[int, int, InOrInOut]] = this_cur + other_cur 

901 

902 everything = everything[:n] 

903 

904 if not include_latest: 

905 return list(everything)[:0:-1] 

906 return list(everything)[::-1] 

907 

908 def _get_range_session( 

909 self, 

910 start: int = 1, 

911 stop: Optional[int] = None, 

912 raw: bool = True, 

913 output: bool = False, 

914 ) -> Iterable[tuple[int, int, InOrInOut]]: 

915 """Get input and output history from the current session. Called by 

916 get_range, and takes similar parameters.""" 

917 input_hist = self.input_hist_raw if raw else self.input_hist_parsed 

918 

919 n = len(input_hist) 

920 if start < 0: 

921 start += n 

922 if not stop or (stop > n): 

923 stop = n 

924 elif stop < 0: 

925 stop += n 

926 line: InOrInOut 

927 for i in range(start, stop): 

928 if output: 

929 line = (input_hist[i], self.output_hist_reprs.get(i)) 

930 else: 

931 line = input_hist[i] 

932 yield (0, i, line) 

933 

934 def get_range( 

935 self, 

936 session: int = 0, 

937 start: int = 1, 

938 stop: Optional[int] = None, 

939 raw: bool = True, 

940 output: bool = False, 

941 ) -> Iterable[tuple[int, int, InOrInOut]]: 

942 """Retrieve input by session. 

943 

944 Parameters 

945 ---------- 

946 session : int 

947 Session number to retrieve. The current session is 0, and negative 

948 numbers count back from current session, so -1 is previous session. 

949 start : int 

950 First line to retrieve. 

951 stop : int 

952 End of line range (excluded from output itself). If None, retrieve 

953 to the end of the session. 

954 raw : bool 

955 If True, return untranslated input 

956 output : bool 

957 If True, attempt to include output. This will be 'real' Python 

958 objects for the current session, or text reprs from previous 

959 sessions if db_log_output was enabled at the time. Where no output 

960 is found, None is used. 

961 

962 Returns 

963 ------- 

964 entries 

965 An iterator over the desired lines. Each line is a 3-tuple, either 

966 (session, line, input) if output is False, or 

967 (session, line, (input, output)) if output is True. 

968 """ 

969 if session <= 0: 

970 session += self.session_number 

971 if session == self.session_number: # Current session 

972 return self._get_range_session(start, stop, raw, output) 

973 return super(HistoryManager, self).get_range(session, start, stop, raw, output) 

974 

975 ## ---------------------------- 

976 ## Methods for storing history: 

977 ## ---------------------------- 

978 def store_inputs( 

979 self, line_num: int, source: str, source_raw: Optional[str] = None 

980 ) -> None: 

981 """Store source and raw input in history and create input cache 

982 variables ``_i*``. 

983 

984 Parameters 

985 ---------- 

986 line_num : int 

987 The prompt number of this input. 

988 source : str 

989 Python input. 

990 source_raw : str, optional 

991 If given, this is the raw input without any IPython transformations 

992 applied to it. If not given, ``source`` is used. 

993 """ 

994 if source_raw is None: 

995 source_raw = source 

996 source = source.rstrip("\n") 

997 source_raw = source_raw.rstrip("\n") 

998 

999 # do not store exit/quit commands 

1000 if self._exit_re.match(source_raw.strip()): 

1001 return 

1002 

1003 self.input_hist_parsed.append(source) 

1004 self.input_hist_raw.append(source_raw) 

1005 

1006 with self.db_input_cache_lock: 

1007 self.db_input_cache.append((line_num, source, source_raw)) 

1008 # Trigger to flush cache and write to DB. 

1009 if len(self.db_input_cache) >= self.db_cache_size: 

1010 if self.using_thread: 

1011 self._restart_thread_if_stopped() 

1012 if self.save_flag is not None: 

1013 self.save_flag.set() 

1014 

1015 # update the auto _i variables 

1016 self._iii = self._ii 

1017 self._ii = self._i 

1018 self._i = self._i00 

1019 self._i00 = source_raw 

1020 

1021 # hackish access to user namespace to create _i1,_i2... dynamically 

1022 new_i = "_i%s" % line_num 

1023 to_main = {"_i": self._i, "_ii": self._ii, "_iii": self._iii, new_i: self._i00} 

1024 

1025 if self.shell is not None: 

1026 self.shell.push(to_main, interactive=False) 

1027 

1028 def store_output(self, line_num: int) -> None: 

1029 """If database output logging is enabled, this saves all the 

1030 outputs from the indicated prompt number to the database. It's 

1031 called by run_cell after code has been executed. 

1032 

1033 Parameters 

1034 ---------- 

1035 line_num : int 

1036 The line number from which to save outputs 

1037 """ 

1038 if (not self.db_log_output) or (line_num not in self.output_hist_reprs): 

1039 return 

1040 lnum: int = line_num 

1041 output = self.output_hist_reprs[line_num] 

1042 

1043 with self.db_output_cache_lock: 

1044 self.db_output_cache.append((line_num, output)) 

1045 if self.db_cache_size <= 1 and self.using_thread: 

1046 self._restart_thread_if_stopped() 

1047 if self.save_flag is not None: 

1048 self.save_flag.set() 

1049 

1050 def _writeout_input_cache(self, conn: sqlite3.Connection) -> None: 

1051 with conn: 

1052 for line in self.db_input_cache: 

1053 conn.execute( 

1054 "INSERT INTO history VALUES (?, ?, ?, ?)", 

1055 (self.session_number,) + line, 

1056 ) 

1057 

1058 def _writeout_output_cache(self, conn: sqlite3.Connection) -> None: 

1059 with conn: 

1060 for line in self.db_output_cache: 

1061 conn.execute( 

1062 "INSERT INTO output_history VALUES (?, ?, ?)", 

1063 (self.session_number,) + line, 

1064 ) 

1065 

1066 @only_when_enabled 

1067 def writeout_cache(self, conn: Optional[sqlite3.Connection] = None) -> None: 

1068 """Write any entries in the cache to the database.""" 

1069 if conn is None: 

1070 conn = self.db 

1071 

1072 with self.db_input_cache_lock: 

1073 try: 

1074 self._writeout_input_cache(conn) 

1075 except sqlite3.IntegrityError: 

1076 self.new_session(conn) 

1077 print( 

1078 "ERROR! Session/line number was not unique in", 

1079 "database. History logging moved to new session", 

1080 self.session_number, 

1081 ) 

1082 try: 

1083 # Try writing to the new session. If this fails, don't 

1084 # recurse 

1085 self._writeout_input_cache(conn) 

1086 except sqlite3.IntegrityError: 

1087 pass 

1088 finally: 

1089 self.db_input_cache = [] 

1090 

1091 with self.db_output_cache_lock: 

1092 try: 

1093 self._writeout_output_cache(conn) 

1094 except sqlite3.IntegrityError: 

1095 print( 

1096 "!! Session/line number for output was not unique", 

1097 "in database. Output will not be stored.", 

1098 ) 

1099 finally: 

1100 self.db_output_cache = [] 

1101 

1102 

1103if hasattr(os, "register_at_fork"): 

1104 os.register_at_fork(before=HistoryManager._stop_thread) 

1105 

1106 

1107from collections.abc import Callable, Iterator 

1108from weakref import ReferenceType 

1109 

1110 

1111@contextmanager 

1112def hold(ref: ReferenceType[HistoryManager]) -> Iterator[ReferenceType[HistoryManager]]: 

1113 """ 

1114 Context manger that hold a reference to a weak ref to make sure it 

1115 is not GC'd during it's context. 

1116 """ 

1117 r = ref() 

1118 yield ref 

1119 del r 

1120 

1121 

1122class HistorySavingThread(threading.Thread): 

1123 """This thread takes care of writing history to the database, so that 

1124 the UI isn't held up while that happens. 

1125 

1126 It waits for the HistoryManager's save_flag to be set, then writes out 

1127 the history cache. The main thread is responsible for setting the flag when 

1128 the cache size reaches a defined threshold.""" 

1129 

1130 save_flag: threading.Event 

1131 daemon: bool = True 

1132 _stop_now: bool = False 

1133 enabled: bool = True 

1134 history_manager: ref[HistoryManager] 

1135 _stopped = False 

1136 

1137 def __init__(self, history_manager: HistoryManager) -> None: 

1138 super(HistorySavingThread, self).__init__(name="IPythonHistorySavingThread") 

1139 self.history_manager = ref(history_manager) 

1140 self.enabled = history_manager.enabled 

1141 self.save_flag = threading.Event() 

1142 

1143 @only_when_enabled 

1144 def run(self) -> None: 

1145 atexit.register(self.stop) 

1146 # We need a separate db connection per thread: 

1147 try: 

1148 hm: ReferenceType[HistoryManager] 

1149 with hold(self.history_manager) as hm: 

1150 if hm() is not None: 

1151 self.db = sqlite3.connect( 

1152 str(hm().hist_file), # type: ignore [union-attr] 

1153 **cast(dict[str, t.Any], hm().connection_options), # type: ignore [union-attr] 

1154 ) 

1155 while True: 

1156 self.save_flag.wait() 

1157 with hold(self.history_manager) as hm: 

1158 if hm() is None: 

1159 self._stop_now = True 

1160 if self._stop_now: 

1161 self.db.close() 

1162 return 

1163 self.save_flag.clear() 

1164 if hm() is not None: 

1165 hm().writeout_cache(self.db) # type: ignore [union-attr] 

1166 

1167 except Exception as e: 

1168 print( 

1169 ( 

1170 "The history saving thread hit an unexpected error (%s)." 

1171 "History will not be written to the database." 

1172 ) 

1173 % repr(e) 

1174 ) 

1175 finally: 

1176 atexit.unregister(self.stop) 

1177 

1178 def stop(self) -> None: 

1179 """This can be called from the main thread to safely stop this thread. 

1180 

1181 Note that it does not attempt to write out remaining history before 

1182 exiting. That should be done by calling the HistoryManager's 

1183 end_session method.""" 

1184 if self._stopped: 

1185 return 

1186 self._stop_now = True 

1187 

1188 self.save_flag.set() 

1189 self._stopped = True 

1190 if self.ident is not None and self != threading.current_thread(): 

1191 self.join() 

1192 

1193 def __del__(self) -> None: 

1194 self.stop() 

1195 

1196 

1197# To match, e.g. ~5/8-~2/3, or ~4 (without trailing slash for full session) 

1198# Session numbers: ~N or N/ 

1199# Line numbers: N (just digits, no ~) 

1200# Range syntax: 4-6 (with end) or 4- (without end, means "onward") 

1201range_re = re.compile( 

1202 r""" 

1203((?P<startsess>(?:~?\d+/)))? 

1204(?P<start>\d+)? 

1205((?P<sep>[\-:]) 

1206 ((?P<endsess>(?:~?\d+/)))? 

1207 (?P<end>\d*))? 

1208$""", 

1209 re.VERBOSE, 

1210) 

1211 

1212 

1213def extract_hist_ranges(ranges_str: str) -> Iterable[tuple[int, int, Optional[int]]]: 

1214 """Turn a string of history ranges into 3-tuples of (session, start, stop). 

1215 

1216 Empty string results in a `[(0, 1, None)]`, i.e. "everything from current 

1217 session". 

1218 

1219 Examples 

1220 -------- 

1221 >>> list(extract_hist_ranges("~8/5-~7/4 2")) 

1222 [(-8, 5, None), (-7, 1, 5), (0, 2, 3)] 

1223 >>> list(extract_hist_ranges("~4/")) 

1224 [(-4, 1, None)] 

1225 >>> list(extract_hist_ranges("4-")) 

1226 [(0, 4, None)] 

1227 >>> list(extract_hist_ranges("~4/4-")) 

1228 [(-4, 4, None)] 

1229 """ 

1230 if ranges_str == "": 

1231 yield (0, 1, None) # Everything from current session 

1232 return 

1233 

1234 for range_str in ranges_str.split(): 

1235 rmatch = range_re.match(range_str) 

1236 if not rmatch: 

1237 continue 

1238 start = rmatch.group("start") 

1239 sep = rmatch.group("sep") 

1240 if start: 

1241 start = int(start) 

1242 end = rmatch.group("end") 

1243 if sep == "-": 

1244 end = (int(end) + 1) if end else None 

1245 else: 

1246 end = int(end) if end else start + 1 

1247 else: 

1248 if not rmatch.group("startsess"): 

1249 continue 

1250 start = 1 

1251 end = None 

1252 startsess = rmatch.group("startsess") or "0" 

1253 endsess = rmatch.group("endsess") or startsess 

1254 startsess = startsess.rstrip("/") 

1255 endsess = endsess.rstrip("/") 

1256 startsess = int(startsess.replace("~", "-")) 

1257 endsess = int(endsess.replace("~", "-")) 

1258 assert endsess >= startsess, "start session must be earlier than end session" 

1259 

1260 if endsess == startsess: 

1261 yield (startsess, start, end) 

1262 continue 

1263 # Multiple sessions in one range: 

1264 yield (startsess, start, None) 

1265 for sess in range(startsess + 1, endsess): 

1266 yield (sess, 1, None) 

1267 yield (endsess, 1, end) 

1268 

1269 

1270def _format_lineno(session: int, line: int) -> str: 

1271 """Helper function to format line numbers properly.""" 

1272 if session == 0: 

1273 return str(line) 

1274 return "%s#%s" % (session, line)