1"""History related magics and functionality"""
2
3from __future__ import annotations
4
5# Copyright (c) IPython Development Team.
6# Distributed under the terms of the Modified BSD License.
7
8
9import atexit
10import datetime
11import re
12
13
14import threading
15from pathlib import Path
16
17from collections import defaultdict
18from contextlib import contextmanager
19from dataclasses import dataclass
20from decorator import decorator
21from traitlets import (
22 Any,
23 Bool,
24 Dict,
25 Instance,
26 Integer,
27 List,
28 TraitError,
29 Unicode,
30 Union,
31 default,
32 observe,
33)
34from traitlets.config.configurable import LoggingConfigurable
35
36from IPython.paths import locate_profile
37from IPython.utils.decorators import undoc
38from typing import Iterable, Tuple, Optional, TYPE_CHECKING
39import typing
40from warnings import warn
41from weakref import ref, WeakSet
42
43if TYPE_CHECKING:
44 from IPython.core.interactiveshell import InteractiveShell
45 from IPython.config.Configuration import Configuration
46
47try:
48 from sqlite3 import DatabaseError, OperationalError
49 import sqlite3
50
51 sqlite3.register_converter(
52 "timestamp", lambda val: datetime.datetime.fromisoformat(val.decode())
53 )
54
55 sqlite3_found = True
56except ModuleNotFoundError:
57 sqlite3_found = False
58
59 class DatabaseError(Exception): # type: ignore [no-redef]
60 pass
61
62 class OperationalError(Exception): # type: ignore [no-redef]
63 pass
64
65
66InOrInOut = typing.Union[str, tuple[str, Optional[str]]]
67
68# -----------------------------------------------------------------------------
69# Classes and functions
70# -----------------------------------------------------------------------------
71
72
73@undoc
74class DummyDB:
75 """Dummy DB that will act as a black hole for history.
76
77 Only used in the absence of sqlite"""
78
79 def execute(*args: typing.Any, **kwargs: typing.Any) -> list:
80 return []
81
82 def commit(self, *args, **kwargs): # type: ignore [no-untyped-def]
83 pass
84
85 def __enter__(self, *args, **kwargs): # type: ignore [no-untyped-def]
86 pass
87
88 def __exit__(self, *args, **kwargs): # type: ignore [no-untyped-def]
89 pass
90
91
92@decorator
93def only_when_enabled(f, self, *a, **kw): # type: ignore [no-untyped-def]
94 """Decorator: return an empty list in the absence of sqlite."""
95 if not self.enabled:
96 return []
97 else:
98 return f(self, *a, **kw)
99
100
101# use 16kB as threshold for whether a corrupt history db should be saved
102# that should be at least 100 entries or so
103_SAVE_DB_SIZE = 16384
104
105
106@decorator
107def catch_corrupt_db(f, self, *a, **kw): # type: ignore [no-untyped-def]
108 """A decorator which wraps HistoryAccessor method calls to catch errors from
109 a corrupt SQLite database, move the old database out of the way, and create
110 a new one.
111
112 We avoid clobbering larger databases because this may be triggered due to filesystem issues,
113 not just a corrupt file.
114 """
115 try:
116 return f(self, *a, **kw)
117 except (DatabaseError, OperationalError) as e:
118 self._corrupt_db_counter += 1
119 self.log.error("Failed to open SQLite history %s (%s).", self.hist_file, e)
120 if self.hist_file != ":memory:":
121 if self._corrupt_db_counter > self._corrupt_db_limit:
122 self.hist_file = ":memory:"
123 self.log.error(
124 "Failed to load history too many times, history will not be saved."
125 )
126 elif self.hist_file.is_file():
127 # move the file out of the way
128 base = str(self.hist_file.parent / self.hist_file.stem)
129 ext = self.hist_file.suffix
130 size = self.hist_file.stat().st_size
131 if size >= _SAVE_DB_SIZE:
132 # if there's significant content, avoid clobbering
133 now = (
134 datetime.datetime.now(datetime.timezone.utc)
135 .isoformat()
136 .replace(":", ".")
137 )
138 newpath = base + "-corrupt-" + now + ext
139 # don't clobber previous corrupt backups
140 for i in range(100):
141 if not Path(newpath).exists():
142 break
143 else:
144 newpath = base + "-corrupt-" + now + ("-%i" % i) + ext
145 else:
146 # not much content, possibly empty; don't worry about clobbering
147 # maybe we should just delete it?
148 newpath = base + "-corrupt" + ext
149 self.hist_file.rename(newpath)
150 self.log.error(
151 "History file was moved to %s and a new file created.", newpath
152 )
153 self.init_db()
154 return []
155 else:
156 # Failed with :memory:, something serious is wrong
157 raise
158
159
160class HistoryAccessorBase(LoggingConfigurable):
161 """An abstract class for History Accessors"""
162
163 def get_tail(
164 self,
165 n: int = 10,
166 raw: bool = True,
167 output: bool = False,
168 include_latest: bool = False,
169 ) -> Iterable[tuple[int, int, InOrInOut]]:
170 raise NotImplementedError
171
172 def search(
173 self,
174 pattern: str = "*",
175 raw: bool = True,
176 search_raw: bool = True,
177 output: bool = False,
178 n: Optional[int] = None,
179 unique: bool = False,
180 ) -> Iterable[tuple[int, int, InOrInOut]]:
181 raise NotImplementedError
182
183 def get_range(
184 self,
185 session: int,
186 start: int = 1,
187 stop: Optional[int] = None,
188 raw: bool = True,
189 output: bool = False,
190 ) -> Iterable[tuple[int, int, InOrInOut]]:
191 raise NotImplementedError
192
193 def get_range_by_str(
194 self, rangestr: str, raw: bool = True, output: bool = False
195 ) -> Iterable[tuple[int, int, InOrInOut]]:
196 raise NotImplementedError
197
198
199class HistoryAccessor(HistoryAccessorBase):
200 """Access the history database without adding to it.
201
202 This is intended for use by standalone history tools. IPython shells use
203 HistoryManager, below, which is a subclass of this."""
204
205 # counter for init_db retries, so we don't keep trying over and over
206 _corrupt_db_counter = 0
207 # after two failures, fallback on :memory:
208 _corrupt_db_limit = 2
209
210 # String holding the path to the history file
211 hist_file = Union(
212 [Instance(Path), Unicode()],
213 help="""Path to file to use for SQLite history database.
214
215 By default, IPython will put the history database in the IPython
216 profile directory. If you would rather share one history among
217 profiles, you can set this value in each, so that they are consistent.
218
219 Due to an issue with fcntl, SQLite is known to misbehave on some NFS
220 mounts. If you see IPython hanging, try setting this to something on a
221 local disk, e.g::
222
223 ipython --HistoryManager.hist_file=/tmp/ipython_hist.sqlite
224
225 you can also use the specific value `:memory:` (including the colon
226 at both end but not the back ticks), to avoid creating an history file.
227
228 """,
229 ).tag(config=True)
230
231 enabled = Bool(
232 sqlite3_found,
233 help="""enable the SQLite history
234
235 set enabled=False to disable the SQLite history,
236 in which case there will be no stored history, no SQLite connection,
237 and no background saving thread. This may be necessary in some
238 threaded environments where IPython is embedded.
239 """,
240 ).tag(config=True)
241
242 connection_options = Dict(
243 help="""Options for configuring the SQLite connection
244
245 These options are passed as keyword args to sqlite3.connect
246 when establishing database connections.
247 """
248 ).tag(config=True)
249
250 @default("connection_options")
251 def _default_connection_options(self) -> dict[str, bool]:
252 return dict(check_same_thread=False)
253
254 # The SQLite database
255 db = Any()
256
257 @observe("db")
258 @only_when_enabled
259 def _db_changed(self, change): # type: ignore [no-untyped-def]
260 """validate the db, since it can be an Instance of two different types"""
261 new = change["new"]
262 connection_types = (DummyDB, sqlite3.Connection)
263 if not isinstance(new, connection_types):
264 msg = "%s.db must be sqlite3 Connection or DummyDB, not %r" % (
265 self.__class__.__name__,
266 new,
267 )
268 raise TraitError(msg)
269
270 def __init__(
271 self, profile: str = "default", hist_file: str = "", **traits: typing.Any
272 ) -> None:
273 """Create a new history accessor.
274
275 Parameters
276 ----------
277 profile : str
278 The name of the profile from which to open history.
279 hist_file : str
280 Path to an SQLite history database stored by IPython. If specified,
281 hist_file overrides profile.
282 config : :class:`~traitlets.config.loader.Config`
283 Config object. hist_file can also be set through this.
284 """
285 super(HistoryAccessor, self).__init__(**traits)
286 # defer setting hist_file from kwarg until after init,
287 # otherwise the default kwarg value would clobber any value
288 # set by config
289 if hist_file:
290 self.hist_file = hist_file
291
292 try:
293 self.hist_file
294 except TraitError:
295 # No one has set the hist_file, yet.
296 self.hist_file = self._get_hist_file_name(profile)
297
298 self.init_db()
299
300 def _get_hist_file_name(self, profile: str = "default") -> Path:
301 """Find the history file for the given profile name.
302
303 This is overridden by the HistoryManager subclass, to use the shell's
304 active profile.
305
306 Parameters
307 ----------
308 profile : str
309 The name of a profile which has a history file.
310 """
311 return Path(locate_profile(profile)) / "history.sqlite"
312
313 @catch_corrupt_db
314 def init_db(self) -> None:
315 """Connect to the database, and create tables if necessary."""
316 if not self.enabled:
317 self.db = DummyDB()
318 return
319
320 # use detect_types so that timestamps return datetime objects
321 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
322 kwargs.update(self.connection_options)
323 self.db = sqlite3.connect(str(self.hist_file), **kwargs) # type: ignore [call-overload]
324 with self.db:
325 self.db.execute(
326 """CREATE TABLE IF NOT EXISTS sessions (session integer
327 primary key autoincrement, start timestamp,
328 end timestamp, num_cmds integer, remark text)"""
329 )
330 self.db.execute(
331 """CREATE TABLE IF NOT EXISTS history
332 (session integer, line integer, source text, source_raw text,
333 PRIMARY KEY (session, line))"""
334 )
335 # Output history is optional, but ensure the table's there so it can be
336 # enabled later.
337 self.db.execute(
338 """CREATE TABLE IF NOT EXISTS output_history
339 (session integer, line integer, output text,
340 PRIMARY KEY (session, line))"""
341 )
342 # success! reset corrupt db count
343 self._corrupt_db_counter = 0
344
345 def writeout_cache(self) -> None:
346 """Overridden by HistoryManager to dump the cache before certain
347 database lookups."""
348 pass
349
350 ## -------------------------------
351 ## Methods for retrieving history:
352 ## -------------------------------
353 def _run_sql(
354 self,
355 sql: str,
356 params: tuple,
357 raw: bool = True,
358 output: bool = False,
359 latest: bool = False,
360 ) -> Iterable[tuple[int, int, InOrInOut]]:
361 """Prepares and runs an SQL query for the history database.
362
363 Parameters
364 ----------
365 sql : str
366 Any filtering expressions to go after SELECT ... FROM ...
367 params : tuple
368 Parameters passed to the SQL query (to replace "?")
369 raw, output : bool
370 See :meth:`get_range`
371 latest : bool
372 Select rows with max (session, line)
373
374 Returns
375 -------
376 Tuples as :meth:`get_range`
377 """
378 toget = "source_raw" if raw else "source"
379 sqlfrom = "history"
380 if output:
381 sqlfrom = "history LEFT JOIN output_history USING (session, line)"
382 toget = "history.%s, output_history.output" % toget
383 if latest:
384 toget += ", MAX(session * 128 * 1024 + line)"
385 this_querry = "SELECT session, line, %s FROM %s " % (toget, sqlfrom) + sql
386 cur = self.db.execute(this_querry, params)
387 if latest:
388 cur = (row[:-1] for row in cur)
389 if output: # Regroup into 3-tuples, and parse JSON
390 return ((ses, lin, (inp, out)) for ses, lin, inp, out in cur)
391 return cur
392
393 @only_when_enabled
394 @catch_corrupt_db
395 def get_session_info(
396 self, session: int
397 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]:
398 """Get info about a session.
399
400 Parameters
401 ----------
402 session : int
403 Session number to retrieve.
404
405 Returns
406 -------
407 session_id : int
408 Session ID number
409 start : datetime
410 Timestamp for the start of the session.
411 end : datetime
412 Timestamp for the end of the session, or None if IPython crashed.
413 num_cmds : int
414 Number of commands run, or None if IPython crashed.
415 remark : str
416 A manually set description.
417 """
418 query = "SELECT * from sessions where session == ?"
419 return self.db.execute(query, (session,)).fetchone()
420
421 @catch_corrupt_db
422 def get_last_session_id(self) -> Optional[int]:
423 """Get the last session ID currently in the database.
424
425 Within IPython, this should be the same as the value stored in
426 :attr:`HistoryManager.session_number`.
427 """
428 for record in self.get_tail(n=1, include_latest=True):
429 return record[0]
430 return None
431
432 @catch_corrupt_db
433 def get_tail(
434 self,
435 n: int = 10,
436 raw: bool = True,
437 output: bool = False,
438 include_latest: bool = False,
439 ) -> Iterable[tuple[int, int, InOrInOut]]:
440 """Get the last n lines from the history database.
441
442 Parameters
443 ----------
444 n : int
445 The number of lines to get
446 raw, output : bool
447 See :meth:`get_range`
448 include_latest : bool
449 If False (default), n+1 lines are fetched, and the latest one
450 is discarded. This is intended to be used where the function
451 is called by a user command, which it should not return.
452
453 Returns
454 -------
455 Tuples as :meth:`get_range`
456 """
457 self.writeout_cache()
458 if not include_latest:
459 n += 1
460 cur = self._run_sql(
461 "ORDER BY session DESC, line DESC LIMIT ?", (n,), raw=raw, output=output
462 )
463 if not include_latest:
464 return reversed(list(cur)[1:])
465 return reversed(list(cur))
466
467 @catch_corrupt_db
468 def search(
469 self,
470 pattern: str = "*",
471 raw: bool = True,
472 search_raw: bool = True,
473 output: bool = False,
474 n: Optional[int] = None,
475 unique: bool = False,
476 ) -> Iterable[tuple[int, int, InOrInOut]]:
477 """Search the database using unix glob-style matching (wildcards
478 * and ?).
479
480 Parameters
481 ----------
482 pattern : str
483 The wildcarded pattern to match when searching
484 search_raw : bool
485 If True, search the raw input, otherwise, the parsed input
486 raw, output : bool
487 See :meth:`get_range`
488 n : None or int
489 If an integer is given, it defines the limit of
490 returned entries.
491 unique : bool
492 When it is true, return only unique entries.
493
494 Returns
495 -------
496 Tuples as :meth:`get_range`
497 """
498 tosearch = "source_raw" if search_raw else "source"
499 if output:
500 tosearch = "history." + tosearch
501 self.writeout_cache()
502 sqlform = "WHERE %s GLOB ?" % tosearch
503 params: tuple[typing.Any, ...] = (pattern,)
504 if unique:
505 sqlform += " GROUP BY {0}".format(tosearch)
506 if n is not None:
507 sqlform += " ORDER BY session DESC, line DESC LIMIT ?"
508 params += (n,)
509 elif unique:
510 sqlform += " ORDER BY session, line"
511 cur = self._run_sql(sqlform, params, raw=raw, output=output, latest=unique)
512 if n is not None:
513 return reversed(list(cur))
514 return cur
515
516 @catch_corrupt_db
517 def get_range(
518 self,
519 session: int,
520 start: int = 1,
521 stop: Optional[int] = None,
522 raw: bool = True,
523 output: bool = False,
524 ) -> Iterable[tuple[int, int, InOrInOut]]:
525 """Retrieve input by session.
526
527 Parameters
528 ----------
529 session : int
530 Session number to retrieve.
531 start : int
532 First line to retrieve.
533 stop : int
534 End of line range (excluded from output itself). If None, retrieve
535 to the end of the session.
536 raw : bool
537 If True, return untranslated input
538 output : bool
539 If True, attempt to include output. This will be 'real' Python
540 objects for the current session, or text reprs from previous
541 sessions if db_log_output was enabled at the time. Where no output
542 is found, None is used.
543
544 Returns
545 -------
546 entries
547 An iterator over the desired lines. Each line is a 3-tuple, either
548 (session, line, input) if output is False, or
549 (session, line, (input, output)) if output is True.
550 """
551 params: tuple[typing.Any, ...]
552 if stop:
553 lineclause = "line >= ? AND line < ?"
554 params = (session, start, stop)
555 else:
556 lineclause = "line>=?"
557 params = (session, start)
558
559 return self._run_sql(
560 "WHERE session==? AND %s" % lineclause, params, raw=raw, output=output
561 )
562
563 def get_range_by_str(
564 self, rangestr: str, raw: bool = True, output: bool = False
565 ) -> Iterable[tuple[int, int, InOrInOut]]:
566 """Get lines of history from a string of ranges, as used by magic
567 commands %hist, %save, %macro, etc.
568
569 Parameters
570 ----------
571 rangestr : str
572 A string specifying ranges, e.g. "5 ~2/1-4". If empty string is used,
573 this will return everything from current session's history.
574
575 See the documentation of :func:`%history` for the full details.
576
577 raw, output : bool
578 As :meth:`get_range`
579
580 Returns
581 -------
582 Tuples as :meth:`get_range`
583 """
584 for sess, s, e in extract_hist_ranges(rangestr):
585 yield from self.get_range(sess, s, e, raw=raw, output=output)
586
587
588@dataclass
589class HistoryOutput:
590 output_type: typing.Literal[
591 "out_stream", "err_stream", "display_data", "execute_result"
592 ]
593 bundle: typing.Dict[str, str]
594
595
596class HistoryManager(HistoryAccessor):
597 """A class to organize all history-related functionality in one place."""
598
599 # Public interface
600
601 # An instance of the IPython shell we are attached to
602 shell = Instance(
603 "IPython.core.interactiveshell.InteractiveShellABC", allow_none=False
604 )
605 # Lists to hold processed and raw history. These start with a blank entry
606 # so that we can index them starting from 1
607 input_hist_parsed = List([""])
608 input_hist_raw = List([""])
609 # A list of directories visited during session
610 dir_hist: List = List()
611
612 @default("dir_hist")
613 def _dir_hist_default(self) -> list[Path]:
614 try:
615 return [Path.cwd()]
616 except OSError:
617 return []
618
619 # A dict of output history, keyed with ints from the shell's
620 # execution count.
621 output_hist = Dict()
622 # The text/plain repr of outputs.
623 output_hist_reprs: typing.Dict[int, str] = Dict() # type: ignore [assignment]
624 # Maps execution_count to MIME bundles
625 outputs: typing.Dict[int, typing.List[HistoryOutput]] = defaultdict(list)
626 # Maps execution_count to exception tracebacks
627 exceptions: typing.Dict[int, typing.Dict[str, Any]] = Dict() # type: ignore [assignment]
628
629 # The number of the current session in the history database
630 session_number: int = Integer() # type: ignore [assignment]
631
632 db_log_output = Bool(
633 False, help="Should the history database include output? (default: no)"
634 ).tag(config=True)
635 db_cache_size = Integer(
636 0,
637 help="Write to database every x commands (higher values save disk access & power).\n"
638 "Values of 1 or less effectively disable caching.",
639 ).tag(config=True)
640 # The input and output caches
641 db_input_cache: List[tuple[int, str, str]] = List()
642 db_output_cache: List[tuple[int, str]] = List()
643
644 # History saving in separate thread
645 save_thread = Instance("IPython.core.history.HistorySavingThread", allow_none=True)
646
647 @property
648 def save_flag(self) -> threading.Event | None:
649 if self.save_thread is not None:
650 return self.save_thread.save_flag
651 return None
652
653 # Private interface
654 # Variables used to store the three last inputs from the user. On each new
655 # history update, we populate the user's namespace with these, shifted as
656 # necessary.
657 _i00 = Unicode("")
658 _i = Unicode("")
659 _ii = Unicode("")
660 _iii = Unicode("")
661
662 # A regex matching all forms of the exit command, so that we don't store
663 # them in the history (it's annoying to rewind the first entry and land on
664 # an exit call).
665 _exit_re = re.compile(r"(exit|quit)(\s*\(.*\))?$")
666
667 _instances: WeakSet[HistoryManager] = WeakSet()
668 _max_inst: int | float = float("inf")
669
670 def __init__(
671 self,
672 shell: InteractiveShell,
673 config: Optional[Configuration] = None,
674 **traits: typing.Any,
675 ):
676 """Create a new history manager associated with a shell instance."""
677 super().__init__(shell=shell, config=config, **traits)
678 self.db_input_cache_lock = threading.Lock()
679 self.db_output_cache_lock = threading.Lock()
680
681 try:
682 self.new_session()
683 except OperationalError:
684 self.log.error(
685 "Failed to create history session in %s. History will not be saved.",
686 self.hist_file,
687 exc_info=True,
688 )
689 self.hist_file = ":memory:"
690
691 if self.enabled and self.hist_file != ":memory:":
692 self.save_thread = HistorySavingThread(self)
693 try:
694 self.save_thread.start()
695 except RuntimeError:
696 self.log.error(
697 "Failed to start history saving thread. History will not be saved.",
698 exc_info=True,
699 )
700 self.hist_file = ":memory:"
701 self._instances.add(self)
702 assert len(HistoryManager._instances) <= HistoryManager._max_inst, (
703 len(HistoryManager._instances),
704 HistoryManager._max_inst,
705 )
706
707 def __del__(self) -> None:
708 if self.save_thread is not None:
709 self.save_thread.stop()
710
711 def _get_hist_file_name(self, profile: Optional[str] = None) -> Path:
712 """Get default history file name based on the Shell's profile.
713
714 The profile parameter is ignored, but must exist for compatibility with
715 the parent class."""
716 profile_dir = self.shell.profile_dir.location
717 return Path(profile_dir) / "history.sqlite"
718
719 @only_when_enabled
720 def new_session(self, conn: Optional[sqlite3.Connection] = None) -> None:
721 """Get a new session number."""
722 if conn is None:
723 conn = self.db
724
725 with conn:
726 cur = conn.execute(
727 """INSERT INTO sessions VALUES (NULL, ?, NULL,
728 NULL, '') """,
729 (datetime.datetime.now().isoformat(" "),),
730 )
731 assert isinstance(cur.lastrowid, int)
732 self.session_number = cur.lastrowid
733
734 def end_session(self) -> None:
735 """Close the database session, filling in the end time and line count."""
736 self.writeout_cache()
737 with self.db:
738 self.db.execute(
739 """UPDATE sessions SET end=?, num_cmds=? WHERE
740 session==?""",
741 (
742 datetime.datetime.now(datetime.timezone.utc).isoformat(" "),
743 len(self.input_hist_parsed) - 1,
744 self.session_number,
745 ),
746 )
747 self.session_number = 0
748
749 def name_session(self, name: str) -> None:
750 """Give the current session a name in the history database."""
751 warn(
752 "name_session is deprecated in IPython 9.0 and will be removed in future versions",
753 DeprecationWarning,
754 stacklevel=2,
755 )
756 with self.db:
757 self.db.execute(
758 "UPDATE sessions SET remark=? WHERE session==?",
759 (name, self.session_number),
760 )
761
762 def reset(self, new_session: bool = True) -> None:
763 """Clear the session history, releasing all object references, and
764 optionally open a new session."""
765 self.output_hist.clear()
766 self.outputs.clear()
767 self.exceptions.clear()
768
769 # The directory history can't be completely empty
770 self.dir_hist[:] = [Path.cwd()]
771
772 if new_session:
773 if self.session_number:
774 self.end_session()
775 self.input_hist_parsed[:] = [""]
776 self.input_hist_raw[:] = [""]
777 self.new_session()
778
779 # ------------------------------
780 # Methods for retrieving history
781 # ------------------------------
782 def get_session_info(
783 self, session: int = 0
784 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]:
785 """Get info about a session.
786
787 Parameters
788 ----------
789 session : int
790 Session number to retrieve. The current session is 0, and negative
791 numbers count back from current session, so -1 is the previous session.
792
793 Returns
794 -------
795 session_id : int
796 Session ID number
797 start : datetime
798 Timestamp for the start of the session.
799 end : datetime
800 Timestamp for the end of the session, or None if IPython crashed.
801 num_cmds : int
802 Number of commands run, or None if IPython crashed.
803 remark : str
804 A manually set description.
805 """
806 if session <= 0:
807 session += self.session_number
808
809 return super(HistoryManager, self).get_session_info(session=session)
810
811 @catch_corrupt_db
812 def get_tail(
813 self,
814 n: int = 10,
815 raw: bool = True,
816 output: bool = False,
817 include_latest: bool = False,
818 ) -> Iterable[tuple[int, int, InOrInOut]]:
819 """Get the last n lines from the history database.
820
821 Most recent entry last.
822
823 Completion will be reordered so that that the last ones are when
824 possible from current session.
825
826 Parameters
827 ----------
828 n : int
829 The number of lines to get
830 raw, output : bool
831 See :meth:`get_range`
832 include_latest : bool
833 If False (default), n+1 lines are fetched, and the latest one
834 is discarded. This is intended to be used where the function
835 is called by a user command, which it should not return.
836
837 Returns
838 -------
839 Tuples as :meth:`get_range`
840 """
841 self.writeout_cache()
842 if not include_latest:
843 n += 1
844 # cursor/line/entry
845 this_cur = list(
846 self._run_sql(
847 "WHERE session == ? ORDER BY line DESC LIMIT ? ",
848 (self.session_number, n),
849 raw=raw,
850 output=output,
851 )
852 )
853 other_cur = list(
854 self._run_sql(
855 "WHERE session != ? ORDER BY session DESC, line DESC LIMIT ?",
856 (self.session_number, n),
857 raw=raw,
858 output=output,
859 )
860 )
861
862 everything: list[tuple[int, int, InOrInOut]] = this_cur + other_cur
863
864 everything = everything[:n]
865
866 if not include_latest:
867 return list(everything)[:0:-1]
868 return list(everything)[::-1]
869
870 def _get_range_session(
871 self,
872 start: int = 1,
873 stop: Optional[int] = None,
874 raw: bool = True,
875 output: bool = False,
876 ) -> Iterable[tuple[int, int, InOrInOut]]:
877 """Get input and output history from the current session. Called by
878 get_range, and takes similar parameters."""
879 input_hist = self.input_hist_raw if raw else self.input_hist_parsed
880
881 n = len(input_hist)
882 if start < 0:
883 start += n
884 if not stop or (stop > n):
885 stop = n
886 elif stop < 0:
887 stop += n
888 line: InOrInOut
889 for i in range(start, stop):
890 if output:
891 line = (input_hist[i], self.output_hist_reprs.get(i))
892 else:
893 line = input_hist[i]
894 yield (0, i, line)
895
896 def get_range(
897 self,
898 session: int = 0,
899 start: int = 1,
900 stop: Optional[int] = None,
901 raw: bool = True,
902 output: bool = False,
903 ) -> Iterable[tuple[int, int, InOrInOut]]:
904 """Retrieve input by session.
905
906 Parameters
907 ----------
908 session : int
909 Session number to retrieve. The current session is 0, and negative
910 numbers count back from current session, so -1 is previous session.
911 start : int
912 First line to retrieve.
913 stop : int
914 End of line range (excluded from output itself). If None, retrieve
915 to the end of the session.
916 raw : bool
917 If True, return untranslated input
918 output : bool
919 If True, attempt to include output. This will be 'real' Python
920 objects for the current session, or text reprs from previous
921 sessions if db_log_output was enabled at the time. Where no output
922 is found, None is used.
923
924 Returns
925 -------
926 entries
927 An iterator over the desired lines. Each line is a 3-tuple, either
928 (session, line, input) if output is False, or
929 (session, line, (input, output)) if output is True.
930 """
931 if session <= 0:
932 session += self.session_number
933 if session == self.session_number: # Current session
934 return self._get_range_session(start, stop, raw, output)
935 return super(HistoryManager, self).get_range(session, start, stop, raw, output)
936
937 ## ----------------------------
938 ## Methods for storing history:
939 ## ----------------------------
940 def store_inputs(
941 self, line_num: int, source: str, source_raw: Optional[str] = None
942 ) -> None:
943 """Store source and raw input in history and create input cache
944 variables ``_i*``.
945
946 Parameters
947 ----------
948 line_num : int
949 The prompt number of this input.
950 source : str
951 Python input.
952 source_raw : str, optional
953 If given, this is the raw input without any IPython transformations
954 applied to it. If not given, ``source`` is used.
955 """
956 if source_raw is None:
957 source_raw = source
958 source = source.rstrip("\n")
959 source_raw = source_raw.rstrip("\n")
960
961 # do not store exit/quit commands
962 if self._exit_re.match(source_raw.strip()):
963 return
964
965 self.input_hist_parsed.append(source)
966 self.input_hist_raw.append(source_raw)
967
968 with self.db_input_cache_lock:
969 self.db_input_cache.append((line_num, source, source_raw))
970 # Trigger to flush cache and write to DB.
971 if len(self.db_input_cache) >= self.db_cache_size:
972 if self.save_flag:
973 self.save_flag.set()
974
975 # update the auto _i variables
976 self._iii = self._ii
977 self._ii = self._i
978 self._i = self._i00
979 self._i00 = source_raw
980
981 # hackish access to user namespace to create _i1,_i2... dynamically
982 new_i = "_i%s" % line_num
983 to_main = {"_i": self._i, "_ii": self._ii, "_iii": self._iii, new_i: self._i00}
984
985 if self.shell is not None:
986 self.shell.push(to_main, interactive=False)
987
988 def store_output(self, line_num: int) -> None:
989 """If database output logging is enabled, this saves all the
990 outputs from the indicated prompt number to the database. It's
991 called by run_cell after code has been executed.
992
993 Parameters
994 ----------
995 line_num : int
996 The line number from which to save outputs
997 """
998 if (not self.db_log_output) or (line_num not in self.output_hist_reprs):
999 return
1000 lnum: int = line_num
1001 output = self.output_hist_reprs[line_num]
1002
1003 with self.db_output_cache_lock:
1004 self.db_output_cache.append((line_num, output))
1005 if self.db_cache_size <= 1 and self.save_flag is not None:
1006 self.save_flag.set()
1007
1008 def _writeout_input_cache(self, conn: sqlite3.Connection) -> None:
1009 with conn:
1010 for line in self.db_input_cache:
1011 conn.execute(
1012 "INSERT INTO history VALUES (?, ?, ?, ?)",
1013 (self.session_number,) + line,
1014 )
1015
1016 def _writeout_output_cache(self, conn: sqlite3.Connection) -> None:
1017 with conn:
1018 for line in self.db_output_cache:
1019 conn.execute(
1020 "INSERT INTO output_history VALUES (?, ?, ?)",
1021 (self.session_number,) + line,
1022 )
1023
1024 @only_when_enabled
1025 def writeout_cache(self, conn: Optional[sqlite3.Connection] = None) -> None:
1026 """Write any entries in the cache to the database."""
1027 if conn is None:
1028 conn = self.db
1029
1030 with self.db_input_cache_lock:
1031 try:
1032 self._writeout_input_cache(conn)
1033 except sqlite3.IntegrityError:
1034 self.new_session(conn)
1035 print(
1036 "ERROR! Session/line number was not unique in",
1037 "database. History logging moved to new session",
1038 self.session_number,
1039 )
1040 try:
1041 # Try writing to the new session. If this fails, don't
1042 # recurse
1043 self._writeout_input_cache(conn)
1044 except sqlite3.IntegrityError:
1045 pass
1046 finally:
1047 self.db_input_cache = []
1048
1049 with self.db_output_cache_lock:
1050 try:
1051 self._writeout_output_cache(conn)
1052 except sqlite3.IntegrityError:
1053 print(
1054 "!! Session/line number for output was not unique",
1055 "in database. Output will not be stored.",
1056 )
1057 finally:
1058 self.db_output_cache = []
1059
1060
1061from typing import Callable, Iterator
1062from weakref import ReferenceType
1063
1064
1065@contextmanager
1066def hold(ref: ReferenceType[HistoryManager]) -> Iterator[ReferenceType[HistoryManager]]:
1067 """
1068 Context manger that hold a reference to a weak ref to make sure it
1069 is not GC'd during it's context.
1070 """
1071 r = ref()
1072 yield ref
1073 del r
1074
1075
1076class HistorySavingThread(threading.Thread):
1077 """This thread takes care of writing history to the database, so that
1078 the UI isn't held up while that happens.
1079
1080 It waits for the HistoryManager's save_flag to be set, then writes out
1081 the history cache. The main thread is responsible for setting the flag when
1082 the cache size reaches a defined threshold."""
1083
1084 save_flag: threading.Event
1085 daemon: bool = True
1086 _stop_now: bool = False
1087 enabled: bool = True
1088 history_manager: ref[HistoryManager]
1089 _stopped = False
1090
1091 def __init__(self, history_manager: HistoryManager) -> None:
1092 super(HistorySavingThread, self).__init__(name="IPythonHistorySavingThread")
1093 self.history_manager = ref(history_manager)
1094 self.enabled = history_manager.enabled
1095 self.save_flag = threading.Event()
1096
1097 @only_when_enabled
1098 def run(self) -> None:
1099 atexit.register(self.stop)
1100 # We need a separate db connection per thread:
1101 try:
1102 hm: ReferenceType[HistoryManager]
1103 with hold(self.history_manager) as hm:
1104 if hm() is not None:
1105 self.db = sqlite3.connect(
1106 str(hm().hist_file), # type: ignore [union-attr]
1107 **hm().connection_options, # type: ignore [union-attr]
1108 )
1109 while True:
1110 self.save_flag.wait()
1111 with hold(self.history_manager) as hm:
1112 if hm() is None:
1113 self._stop_now = True
1114 if self._stop_now:
1115 self.db.close()
1116 return
1117 self.save_flag.clear()
1118 if hm() is not None:
1119 hm().writeout_cache(self.db) # type: ignore [union-attr]
1120
1121 except Exception as e:
1122 print(
1123 (
1124 "The history saving thread hit an unexpected error (%s)."
1125 "History will not be written to the database."
1126 )
1127 % repr(e)
1128 )
1129 finally:
1130 atexit.unregister(self.stop)
1131
1132 def stop(self) -> None:
1133 """This can be called from the main thread to safely stop this thread.
1134
1135 Note that it does not attempt to write out remaining history before
1136 exiting. That should be done by calling the HistoryManager's
1137 end_session method."""
1138 if self._stopped:
1139 return
1140 self._stop_now = True
1141
1142 self.save_flag.set()
1143 self._stopped = True
1144 if self != threading.current_thread():
1145 self.join()
1146
1147 def __del__(self) -> None:
1148 self.stop()
1149
1150
1151# To match, e.g. ~5/8-~2/3
1152range_re = re.compile(
1153 r"""
1154((?P<startsess>~?\d+)/)?
1155(?P<start>\d+)?
1156((?P<sep>[\-:])
1157 ((?P<endsess>~?\d+)/)?
1158 (?P<end>\d+))?
1159$""",
1160 re.VERBOSE,
1161)
1162
1163
1164def extract_hist_ranges(ranges_str: str) -> Iterable[tuple[int, int, Optional[int]]]:
1165 """Turn a string of history ranges into 3-tuples of (session, start, stop).
1166
1167 Empty string results in a `[(0, 1, None)]`, i.e. "everything from current
1168 session".
1169
1170 Examples
1171 --------
1172 >>> list(extract_hist_ranges("~8/5-~7/4 2"))
1173 [(-8, 5, None), (-7, 1, 5), (0, 2, 3)]
1174 """
1175 if ranges_str == "":
1176 yield (0, 1, None) # Everything from current session
1177 return
1178
1179 for range_str in ranges_str.split():
1180 rmatch = range_re.match(range_str)
1181 if not rmatch:
1182 continue
1183 start = rmatch.group("start")
1184 if start:
1185 start = int(start)
1186 end = rmatch.group("end")
1187 # If no end specified, get (a, a + 1)
1188 end = int(end) if end else start + 1
1189 else: # start not specified
1190 if not rmatch.group("startsess"): # no startsess
1191 continue
1192 start = 1
1193 end = None # provide the entire session hist
1194
1195 if rmatch.group("sep") == "-": # 1-3 == 1:4 --> [1, 2, 3]
1196 assert end is not None
1197 end += 1
1198 startsess = rmatch.group("startsess") or "0"
1199 endsess = rmatch.group("endsess") or startsess
1200 startsess = int(startsess.replace("~", "-"))
1201 endsess = int(endsess.replace("~", "-"))
1202 assert endsess >= startsess, "start session must be earlier than end session"
1203
1204 if endsess == startsess:
1205 yield (startsess, start, end)
1206 continue
1207 # Multiple sessions in one range:
1208 yield (startsess, start, None)
1209 for sess in range(startsess + 1, endsess):
1210 yield (sess, 1, None)
1211 yield (endsess, 1, end)
1212
1213
1214def _format_lineno(session: int, line: int) -> str:
1215 """Helper function to format line numbers properly."""
1216 if session == 0:
1217 return str(line)
1218 return "%s#%s" % (session, line)