Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/history.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""History related magics and functionality"""
3from __future__ import annotations
5# Copyright (c) IPython Development Team.
6# Distributed under the terms of the Modified BSD License.
9import atexit
10import datetime
11import re
14import threading
15from pathlib import Path
17from collections import defaultdict
18from contextlib import contextmanager
19from dataclasses import dataclass
20from decorator import decorator
21from traitlets import (
22 Any,
23 Bool,
24 Dict,
25 Instance,
26 Integer,
27 List,
28 TraitError,
29 Unicode,
30 Union,
31 default,
32 observe,
33)
34from traitlets.config.configurable import LoggingConfigurable
36from IPython.paths import locate_profile
37from IPython.utils.decorators import undoc
38from typing import Iterable, Tuple, Optional, TYPE_CHECKING
39import typing
40from warnings import warn
41from weakref import ref, WeakSet
43if TYPE_CHECKING:
44 from IPython.core.interactiveshell import InteractiveShell
45 from IPython.config.Configuration import Configuration
47try:
48 from sqlite3 import DatabaseError, OperationalError
49 import sqlite3
51 sqlite3.register_converter(
52 "timestamp", lambda val: datetime.datetime.fromisoformat(val.decode())
53 )
55 sqlite3_found = True
56except ModuleNotFoundError:
57 sqlite3_found = False
59 class DatabaseError(Exception): # type: ignore [no-redef]
60 pass
62 class OperationalError(Exception): # type: ignore [no-redef]
63 pass
66InOrInOut = typing.Union[str, tuple[str, Optional[str]]]
68# -----------------------------------------------------------------------------
69# Classes and functions
70# -----------------------------------------------------------------------------
73@undoc
74class DummyDB:
75 """Dummy DB that will act as a black hole for history.
77 Only used in the absence of sqlite"""
79 def execute(*args: typing.Any, **kwargs: typing.Any) -> list:
80 return []
82 def commit(self, *args, **kwargs): # type: ignore [no-untyped-def]
83 pass
85 def __enter__(self, *args, **kwargs): # type: ignore [no-untyped-def]
86 pass
88 def __exit__(self, *args, **kwargs): # type: ignore [no-untyped-def]
89 pass
92@decorator
93def only_when_enabled(f, self, *a, **kw): # type: ignore [no-untyped-def]
94 """Decorator: return an empty list in the absence of sqlite."""
95 if not self.enabled:
96 return []
97 else:
98 return f(self, *a, **kw)
101# use 16kB as threshold for whether a corrupt history db should be saved
102# that should be at least 100 entries or so
103_SAVE_DB_SIZE = 16384
106@decorator
107def catch_corrupt_db(f, self, *a, **kw): # type: ignore [no-untyped-def]
108 """A decorator which wraps HistoryAccessor method calls to catch errors from
109 a corrupt SQLite database, move the old database out of the way, and create
110 a new one.
112 We avoid clobbering larger databases because this may be triggered due to filesystem issues,
113 not just a corrupt file.
114 """
115 try:
116 return f(self, *a, **kw)
117 except (DatabaseError, OperationalError) as e:
118 self._corrupt_db_counter += 1
119 self.log.error("Failed to open SQLite history %s (%s).", self.hist_file, e)
120 if self.hist_file != ":memory:":
121 if self._corrupt_db_counter > self._corrupt_db_limit:
122 self.hist_file = ":memory:"
123 self.log.error(
124 "Failed to load history too many times, history will not be saved."
125 )
126 elif self.hist_file.is_file():
127 # move the file out of the way
128 base = str(self.hist_file.parent / self.hist_file.stem)
129 ext = self.hist_file.suffix
130 size = self.hist_file.stat().st_size
131 if size >= _SAVE_DB_SIZE:
132 # if there's significant content, avoid clobbering
133 now = (
134 datetime.datetime.now(datetime.timezone.utc)
135 .isoformat()
136 .replace(":", ".")
137 )
138 newpath = base + "-corrupt-" + now + ext
139 # don't clobber previous corrupt backups
140 for i in range(100):
141 if not Path(newpath).exists():
142 break
143 else:
144 newpath = base + "-corrupt-" + now + ("-%i" % i) + ext
145 else:
146 # not much content, possibly empty; don't worry about clobbering
147 # maybe we should just delete it?
148 newpath = base + "-corrupt" + ext
149 self.hist_file.rename(newpath)
150 self.log.error(
151 "History file was moved to %s and a new file created.", newpath
152 )
153 self.init_db()
154 return []
155 else:
156 # Failed with :memory:, something serious is wrong
157 raise
160class HistoryAccessorBase(LoggingConfigurable):
161 """An abstract class for History Accessors"""
163 def get_tail(
164 self,
165 n: int = 10,
166 raw: bool = True,
167 output: bool = False,
168 include_latest: bool = False,
169 ) -> Iterable[tuple[int, int, InOrInOut]]:
170 raise NotImplementedError
172 def search(
173 self,
174 pattern: str = "*",
175 raw: bool = True,
176 search_raw: bool = True,
177 output: bool = False,
178 n: Optional[int] = None,
179 unique: bool = False,
180 ) -> Iterable[tuple[int, int, InOrInOut]]:
181 raise NotImplementedError
183 def get_range(
184 self,
185 session: int,
186 start: int = 1,
187 stop: Optional[int] = None,
188 raw: bool = True,
189 output: bool = False,
190 ) -> Iterable[tuple[int, int, InOrInOut]]:
191 raise NotImplementedError
193 def get_range_by_str(
194 self, rangestr: str, raw: bool = True, output: bool = False
195 ) -> Iterable[tuple[int, int, InOrInOut]]:
196 raise NotImplementedError
199class HistoryAccessor(HistoryAccessorBase):
200 """Access the history database without adding to it.
202 This is intended for use by standalone history tools. IPython shells use
203 HistoryManager, below, which is a subclass of this."""
205 # counter for init_db retries, so we don't keep trying over and over
206 _corrupt_db_counter = 0
207 # after two failures, fallback on :memory:
208 _corrupt_db_limit = 2
210 # String holding the path to the history file
211 hist_file = Union(
212 [Instance(Path), Unicode()],
213 help="""Path to file to use for SQLite history database.
215 By default, IPython will put the history database in the IPython
216 profile directory. If you would rather share one history among
217 profiles, you can set this value in each, so that they are consistent.
219 Due to an issue with fcntl, SQLite is known to misbehave on some NFS
220 mounts. If you see IPython hanging, try setting this to something on a
221 local disk, e.g::
223 ipython --HistoryManager.hist_file=/tmp/ipython_hist.sqlite
225 you can also use the specific value `:memory:` (including the colon
226 at both end but not the back ticks), to avoid creating an history file.
228 """,
229 ).tag(config=True)
231 enabled = Bool(
232 sqlite3_found,
233 help="""enable the SQLite history
235 set enabled=False to disable the SQLite history,
236 in which case there will be no stored history, no SQLite connection,
237 and no background saving thread. This may be necessary in some
238 threaded environments where IPython is embedded.
239 """,
240 ).tag(config=True)
242 connection_options = Dict(
243 help="""Options for configuring the SQLite connection
245 These options are passed as keyword args to sqlite3.connect
246 when establishing database connections.
247 """
248 ).tag(config=True)
250 @default("connection_options")
251 def _default_connection_options(self) -> dict[str, bool]:
252 return dict(check_same_thread=False)
254 # The SQLite database
255 db = Any()
257 @observe("db")
258 @only_when_enabled
259 def _db_changed(self, change): # type: ignore [no-untyped-def]
260 """validate the db, since it can be an Instance of two different types"""
261 new = change["new"]
262 connection_types = (DummyDB, sqlite3.Connection)
263 if not isinstance(new, connection_types):
264 msg = "%s.db must be sqlite3 Connection or DummyDB, not %r" % (
265 self.__class__.__name__,
266 new,
267 )
268 raise TraitError(msg)
270 def __init__(
271 self, profile: str = "default", hist_file: str = "", **traits: typing.Any
272 ) -> None:
273 """Create a new history accessor.
275 Parameters
276 ----------
277 profile : str
278 The name of the profile from which to open history.
279 hist_file : str
280 Path to an SQLite history database stored by IPython. If specified,
281 hist_file overrides profile.
282 config : :class:`~traitlets.config.loader.Config`
283 Config object. hist_file can also be set through this.
284 """
285 super(HistoryAccessor, self).__init__(**traits)
286 # defer setting hist_file from kwarg until after init,
287 # otherwise the default kwarg value would clobber any value
288 # set by config
289 if hist_file:
290 self.hist_file = hist_file
292 try:
293 self.hist_file
294 except TraitError:
295 # No one has set the hist_file, yet.
296 self.hist_file = self._get_hist_file_name(profile)
298 self.init_db()
300 def _get_hist_file_name(self, profile: str = "default") -> Path:
301 """Find the history file for the given profile name.
303 This is overridden by the HistoryManager subclass, to use the shell's
304 active profile.
306 Parameters
307 ----------
308 profile : str
309 The name of a profile which has a history file.
310 """
311 return Path(locate_profile(profile)) / "history.sqlite"
313 @catch_corrupt_db
314 def init_db(self) -> None:
315 """Connect to the database, and create tables if necessary."""
316 if not self.enabled:
317 self.db = DummyDB()
318 return
320 # use detect_types so that timestamps return datetime objects
321 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
322 kwargs.update(self.connection_options)
323 self.db = sqlite3.connect(str(self.hist_file), **kwargs) # type: ignore [call-overload]
324 with self.db:
325 self.db.execute(
326 """CREATE TABLE IF NOT EXISTS sessions (session integer
327 primary key autoincrement, start timestamp,
328 end timestamp, num_cmds integer, remark text)"""
329 )
330 self.db.execute(
331 """CREATE TABLE IF NOT EXISTS history
332 (session integer, line integer, source text, source_raw text,
333 PRIMARY KEY (session, line))"""
334 )
335 # Output history is optional, but ensure the table's there so it can be
336 # enabled later.
337 self.db.execute(
338 """CREATE TABLE IF NOT EXISTS output_history
339 (session integer, line integer, output text,
340 PRIMARY KEY (session, line))"""
341 )
342 # success! reset corrupt db count
343 self._corrupt_db_counter = 0
345 def writeout_cache(self) -> None:
346 """Overridden by HistoryManager to dump the cache before certain
347 database lookups."""
348 pass
350 ## -------------------------------
351 ## Methods for retrieving history:
352 ## -------------------------------
353 def _run_sql(
354 self,
355 sql: str,
356 params: tuple,
357 raw: bool = True,
358 output: bool = False,
359 latest: bool = False,
360 ) -> Iterable[tuple[int, int, InOrInOut]]:
361 """Prepares and runs an SQL query for the history database.
363 Parameters
364 ----------
365 sql : str
366 Any filtering expressions to go after SELECT ... FROM ...
367 params : tuple
368 Parameters passed to the SQL query (to replace "?")
369 raw, output : bool
370 See :meth:`get_range`
371 latest : bool
372 Select rows with max (session, line)
374 Returns
375 -------
376 Tuples as :meth:`get_range`
377 """
378 toget = "source_raw" if raw else "source"
379 sqlfrom = "history"
380 if output:
381 sqlfrom = "history LEFT JOIN output_history USING (session, line)"
382 toget = "history.%s, output_history.output" % toget
383 if latest:
384 toget += ", MAX(session * 128 * 1024 + line)"
385 this_querry = "SELECT session, line, %s FROM %s " % (toget, sqlfrom) + sql
386 cur = self.db.execute(this_querry, params)
387 if latest:
388 cur = (row[:-1] for row in cur)
389 if output: # Regroup into 3-tuples, and parse JSON
390 return ((ses, lin, (inp, out)) for ses, lin, inp, out in cur)
391 return cur
393 @only_when_enabled
394 @catch_corrupt_db
395 def get_session_info(
396 self, session: int
397 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]:
398 """Get info about a session.
400 Parameters
401 ----------
402 session : int
403 Session number to retrieve.
405 Returns
406 -------
407 session_id : int
408 Session ID number
409 start : datetime
410 Timestamp for the start of the session.
411 end : datetime
412 Timestamp for the end of the session, or None if IPython crashed.
413 num_cmds : int
414 Number of commands run, or None if IPython crashed.
415 remark : str
416 A manually set description.
417 """
418 query = "SELECT * from sessions where session == ?"
419 return self.db.execute(query, (session,)).fetchone()
421 @catch_corrupt_db
422 def get_last_session_id(self) -> Optional[int]:
423 """Get the last session ID currently in the database.
425 Within IPython, this should be the same as the value stored in
426 :attr:`HistoryManager.session_number`.
427 """
428 for record in self.get_tail(n=1, include_latest=True):
429 return record[0]
430 return None
432 @catch_corrupt_db
433 def get_tail(
434 self,
435 n: int = 10,
436 raw: bool = True,
437 output: bool = False,
438 include_latest: bool = False,
439 ) -> Iterable[tuple[int, int, InOrInOut]]:
440 """Get the last n lines from the history database.
442 Parameters
443 ----------
444 n : int
445 The number of lines to get
446 raw, output : bool
447 See :meth:`get_range`
448 include_latest : bool
449 If False (default), n+1 lines are fetched, and the latest one
450 is discarded. This is intended to be used where the function
451 is called by a user command, which it should not return.
453 Returns
454 -------
455 Tuples as :meth:`get_range`
456 """
457 self.writeout_cache()
458 if not include_latest:
459 n += 1
460 cur = self._run_sql(
461 "ORDER BY session DESC, line DESC LIMIT ?", (n,), raw=raw, output=output
462 )
463 if not include_latest:
464 return reversed(list(cur)[1:])
465 return reversed(list(cur))
467 @catch_corrupt_db
468 def search(
469 self,
470 pattern: str = "*",
471 raw: bool = True,
472 search_raw: bool = True,
473 output: bool = False,
474 n: Optional[int] = None,
475 unique: bool = False,
476 ) -> Iterable[tuple[int, int, InOrInOut]]:
477 """Search the database using unix glob-style matching (wildcards
478 * and ?).
480 Parameters
481 ----------
482 pattern : str
483 The wildcarded pattern to match when searching
484 search_raw : bool
485 If True, search the raw input, otherwise, the parsed input
486 raw, output : bool
487 See :meth:`get_range`
488 n : None or int
489 If an integer is given, it defines the limit of
490 returned entries.
491 unique : bool
492 When it is true, return only unique entries.
494 Returns
495 -------
496 Tuples as :meth:`get_range`
497 """
498 tosearch = "source_raw" if search_raw else "source"
499 if output:
500 tosearch = "history." + tosearch
501 self.writeout_cache()
502 sqlform = "WHERE %s GLOB ?" % tosearch
503 params: tuple[typing.Any, ...] = (pattern,)
504 if unique:
505 sqlform += " GROUP BY {0}".format(tosearch)
506 if n is not None:
507 sqlform += " ORDER BY session DESC, line DESC LIMIT ?"
508 params += (n,)
509 elif unique:
510 sqlform += " ORDER BY session, line"
511 cur = self._run_sql(sqlform, params, raw=raw, output=output, latest=unique)
512 if n is not None:
513 return reversed(list(cur))
514 return cur
516 @catch_corrupt_db
517 def get_range(
518 self,
519 session: int,
520 start: int = 1,
521 stop: Optional[int] = None,
522 raw: bool = True,
523 output: bool = False,
524 ) -> Iterable[tuple[int, int, InOrInOut]]:
525 """Retrieve input by session.
527 Parameters
528 ----------
529 session : int
530 Session number to retrieve.
531 start : int
532 First line to retrieve.
533 stop : int
534 End of line range (excluded from output itself). If None, retrieve
535 to the end of the session.
536 raw : bool
537 If True, return untranslated input
538 output : bool
539 If True, attempt to include output. This will be 'real' Python
540 objects for the current session, or text reprs from previous
541 sessions if db_log_output was enabled at the time. Where no output
542 is found, None is used.
544 Returns
545 -------
546 entries
547 An iterator over the desired lines. Each line is a 3-tuple, either
548 (session, line, input) if output is False, or
549 (session, line, (input, output)) if output is True.
550 """
551 params: tuple[typing.Any, ...]
552 if stop:
553 lineclause = "line >= ? AND line < ?"
554 params = (session, start, stop)
555 else:
556 lineclause = "line>=?"
557 params = (session, start)
559 return self._run_sql(
560 "WHERE session==? AND %s" % lineclause, params, raw=raw, output=output
561 )
563 def get_range_by_str(
564 self, rangestr: str, raw: bool = True, output: bool = False
565 ) -> Iterable[tuple[int, int, InOrInOut]]:
566 """Get lines of history from a string of ranges, as used by magic
567 commands %hist, %save, %macro, etc.
569 Parameters
570 ----------
571 rangestr : str
572 A string specifying ranges, e.g. "5 ~2/1-4". If empty string is used,
573 this will return everything from current session's history.
575 See the documentation of :func:`%history` for the full details.
577 raw, output : bool
578 As :meth:`get_range`
580 Returns
581 -------
582 Tuples as :meth:`get_range`
583 """
584 for sess, s, e in extract_hist_ranges(rangestr):
585 yield from self.get_range(sess, s, e, raw=raw, output=output)
588@dataclass
589class HistoryOutput:
590 output_type: typing.Literal[
591 "out_stream", "err_stream", "display_data", "execute_result"
592 ]
593 bundle: typing.Dict[str, str | list[str]]
596class HistoryManager(HistoryAccessor):
597 """A class to organize all history-related functionality in one place."""
599 # Public interface
601 # An instance of the IPython shell we are attached to
602 shell = Instance(
603 "IPython.core.interactiveshell.InteractiveShellABC", allow_none=False
604 )
605 # Lists to hold processed and raw history. These start with a blank entry
606 # so that we can index them starting from 1
607 input_hist_parsed = List([""])
608 input_hist_raw = List([""])
609 # A list of directories visited during session
610 dir_hist: List = List()
612 @default("dir_hist")
613 def _dir_hist_default(self) -> list[Path]:
614 try:
615 return [Path.cwd()]
616 except OSError:
617 return []
619 # A dict of output history, keyed with ints from the shell's
620 # execution count.
621 output_hist = Dict()
622 # The text/plain repr of outputs.
623 output_hist_reprs: typing.Dict[int, str] = Dict() # type: ignore [assignment]
624 # Maps execution_count to MIME bundles
625 outputs: typing.Dict[int, typing.List[HistoryOutput]] = defaultdict(list)
626 # Maps execution_count to exception tracebacks
627 exceptions: typing.Dict[int, typing.Dict[str, Any]] = Dict() # type: ignore [assignment]
629 # The number of the current session in the history database
630 session_number: int = Integer() # type: ignore [assignment]
632 db_log_output = Bool(
633 False, help="Should the history database include output? (default: no)"
634 ).tag(config=True)
635 db_cache_size = Integer(
636 0,
637 help="Write to database every x commands (higher values save disk access & power).\n"
638 "Values of 1 or less effectively disable caching.",
639 ).tag(config=True)
640 # The input and output caches
641 db_input_cache: List[tuple[int, str, str]] = List()
642 db_output_cache: List[tuple[int, str]] = List()
644 # History saving in separate thread
645 save_thread = Instance("IPython.core.history.HistorySavingThread", allow_none=True)
647 @property
648 def save_flag(self) -> threading.Event | None:
649 if self.save_thread is not None:
650 return self.save_thread.save_flag
651 return None
653 # Private interface
654 # Variables used to store the three last inputs from the user. On each new
655 # history update, we populate the user's namespace with these, shifted as
656 # necessary.
657 _i00 = Unicode("")
658 _i = Unicode("")
659 _ii = Unicode("")
660 _iii = Unicode("")
662 # A regex matching all forms of the exit command, so that we don't store
663 # them in the history (it's annoying to rewind the first entry and land on
664 # an exit call).
665 _exit_re = re.compile(r"(exit|quit)(\s*\(.*\))?$")
667 _instances: WeakSet[HistoryManager] = WeakSet()
668 _max_inst: int | float = float("inf")
670 def __init__(
671 self,
672 shell: InteractiveShell,
673 config: Optional[Configuration] = None,
674 **traits: typing.Any,
675 ):
676 """Create a new history manager associated with a shell instance."""
677 super().__init__(shell=shell, config=config, **traits)
678 self.db_input_cache_lock = threading.Lock()
679 self.db_output_cache_lock = threading.Lock()
681 try:
682 self.new_session()
683 except OperationalError:
684 self.log.error(
685 "Failed to create history session in %s. History will not be saved.",
686 self.hist_file,
687 exc_info=True,
688 )
689 self.hist_file = ":memory:"
691 if self.enabled and self.hist_file != ":memory:":
692 self.save_thread = HistorySavingThread(self)
693 try:
694 self.save_thread.start()
695 except RuntimeError:
696 self.log.error(
697 "Failed to start history saving thread. History will not be saved.",
698 exc_info=True,
699 )
700 self.hist_file = ":memory:"
701 self._instances.add(self)
702 assert len(HistoryManager._instances) <= HistoryManager._max_inst, (
703 len(HistoryManager._instances),
704 HistoryManager._max_inst,
705 )
707 def __del__(self) -> None:
708 if self.save_thread is not None:
709 self.save_thread.stop()
711 def _get_hist_file_name(self, profile: Optional[str] = None) -> Path:
712 """Get default history file name based on the Shell's profile.
714 The profile parameter is ignored, but must exist for compatibility with
715 the parent class."""
716 profile_dir = self.shell.profile_dir.location
717 return Path(profile_dir) / "history.sqlite"
719 @only_when_enabled
720 def new_session(self, conn: Optional[sqlite3.Connection] = None) -> None:
721 """Get a new session number."""
722 if conn is None:
723 conn = self.db
725 with conn:
726 cur = conn.execute(
727 """INSERT INTO sessions VALUES (NULL, ?, NULL,
728 NULL, '') """,
729 (datetime.datetime.now().isoformat(" "),),
730 )
731 assert isinstance(cur.lastrowid, int)
732 self.session_number = cur.lastrowid
734 def end_session(self) -> None:
735 """Close the database session, filling in the end time and line count."""
736 self.writeout_cache()
737 with self.db:
738 self.db.execute(
739 """UPDATE sessions SET end=?, num_cmds=? WHERE
740 session==?""",
741 (
742 datetime.datetime.now(datetime.timezone.utc).isoformat(" "),
743 len(self.input_hist_parsed) - 1,
744 self.session_number,
745 ),
746 )
747 self.session_number = 0
749 def name_session(self, name: str) -> None:
750 """Give the current session a name in the history database."""
751 warn(
752 "name_session is deprecated in IPython 9.0 and will be removed in future versions",
753 DeprecationWarning,
754 stacklevel=2,
755 )
756 with self.db:
757 self.db.execute(
758 "UPDATE sessions SET remark=? WHERE session==?",
759 (name, self.session_number),
760 )
762 def reset(self, new_session: bool = True) -> None:
763 """Clear the session history, releasing all object references, and
764 optionally open a new session."""
765 self.output_hist.clear()
766 self.outputs.clear()
767 self.exceptions.clear()
769 # The directory history can't be completely empty
770 self.dir_hist[:] = [Path.cwd()]
772 if new_session:
773 if self.session_number:
774 self.end_session()
775 self.input_hist_parsed[:] = [""]
776 self.input_hist_raw[:] = [""]
777 self.new_session()
779 # ------------------------------
780 # Methods for retrieving history
781 # ------------------------------
782 def get_session_info(
783 self, session: int = 0
784 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]:
785 """Get info about a session.
787 Parameters
788 ----------
789 session : int
790 Session number to retrieve. The current session is 0, and negative
791 numbers count back from current session, so -1 is the previous session.
793 Returns
794 -------
795 session_id : int
796 Session ID number
797 start : datetime
798 Timestamp for the start of the session.
799 end : datetime
800 Timestamp for the end of the session, or None if IPython crashed.
801 num_cmds : int
802 Number of commands run, or None if IPython crashed.
803 remark : str
804 A manually set description.
805 """
806 if session <= 0:
807 session += self.session_number
809 return super(HistoryManager, self).get_session_info(session=session)
811 @catch_corrupt_db
812 def get_tail(
813 self,
814 n: int = 10,
815 raw: bool = True,
816 output: bool = False,
817 include_latest: bool = False,
818 ) -> Iterable[tuple[int, int, InOrInOut]]:
819 """Get the last n lines from the history database.
821 Most recent entry last.
823 Completion will be reordered so that that the last ones are when
824 possible from current session.
826 Parameters
827 ----------
828 n : int
829 The number of lines to get
830 raw, output : bool
831 See :meth:`get_range`
832 include_latest : bool
833 If False (default), n+1 lines are fetched, and the latest one
834 is discarded. This is intended to be used where the function
835 is called by a user command, which it should not return.
837 Returns
838 -------
839 Tuples as :meth:`get_range`
840 """
841 self.writeout_cache()
842 if not include_latest:
843 n += 1
844 # cursor/line/entry
845 this_cur = list(
846 self._run_sql(
847 "WHERE session == ? ORDER BY line DESC LIMIT ? ",
848 (self.session_number, n),
849 raw=raw,
850 output=output,
851 )
852 )
853 other_cur = list(
854 self._run_sql(
855 "WHERE session != ? ORDER BY session DESC, line DESC LIMIT ?",
856 (self.session_number, n),
857 raw=raw,
858 output=output,
859 )
860 )
862 everything: list[tuple[int, int, InOrInOut]] = this_cur + other_cur
864 everything = everything[:n]
866 if not include_latest:
867 return list(everything)[:0:-1]
868 return list(everything)[::-1]
870 def _get_range_session(
871 self,
872 start: int = 1,
873 stop: Optional[int] = None,
874 raw: bool = True,
875 output: bool = False,
876 ) -> Iterable[tuple[int, int, InOrInOut]]:
877 """Get input and output history from the current session. Called by
878 get_range, and takes similar parameters."""
879 input_hist = self.input_hist_raw if raw else self.input_hist_parsed
881 n = len(input_hist)
882 if start < 0:
883 start += n
884 if not stop or (stop > n):
885 stop = n
886 elif stop < 0:
887 stop += n
888 line: InOrInOut
889 for i in range(start, stop):
890 if output:
891 line = (input_hist[i], self.output_hist_reprs.get(i))
892 else:
893 line = input_hist[i]
894 yield (0, i, line)
896 def get_range(
897 self,
898 session: int = 0,
899 start: int = 1,
900 stop: Optional[int] = None,
901 raw: bool = True,
902 output: bool = False,
903 ) -> Iterable[tuple[int, int, InOrInOut]]:
904 """Retrieve input by session.
906 Parameters
907 ----------
908 session : int
909 Session number to retrieve. The current session is 0, and negative
910 numbers count back from current session, so -1 is previous session.
911 start : int
912 First line to retrieve.
913 stop : int
914 End of line range (excluded from output itself). If None, retrieve
915 to the end of the session.
916 raw : bool
917 If True, return untranslated input
918 output : bool
919 If True, attempt to include output. This will be 'real' Python
920 objects for the current session, or text reprs from previous
921 sessions if db_log_output was enabled at the time. Where no output
922 is found, None is used.
924 Returns
925 -------
926 entries
927 An iterator over the desired lines. Each line is a 3-tuple, either
928 (session, line, input) if output is False, or
929 (session, line, (input, output)) if output is True.
930 """
931 if session <= 0:
932 session += self.session_number
933 if session == self.session_number: # Current session
934 return self._get_range_session(start, stop, raw, output)
935 return super(HistoryManager, self).get_range(session, start, stop, raw, output)
937 ## ----------------------------
938 ## Methods for storing history:
939 ## ----------------------------
940 def store_inputs(
941 self, line_num: int, source: str, source_raw: Optional[str] = None
942 ) -> None:
943 """Store source and raw input in history and create input cache
944 variables ``_i*``.
946 Parameters
947 ----------
948 line_num : int
949 The prompt number of this input.
950 source : str
951 Python input.
952 source_raw : str, optional
953 If given, this is the raw input without any IPython transformations
954 applied to it. If not given, ``source`` is used.
955 """
956 if source_raw is None:
957 source_raw = source
958 source = source.rstrip("\n")
959 source_raw = source_raw.rstrip("\n")
961 # do not store exit/quit commands
962 if self._exit_re.match(source_raw.strip()):
963 return
965 self.input_hist_parsed.append(source)
966 self.input_hist_raw.append(source_raw)
968 with self.db_input_cache_lock:
969 self.db_input_cache.append((line_num, source, source_raw))
970 # Trigger to flush cache and write to DB.
971 if len(self.db_input_cache) >= self.db_cache_size:
972 if self.save_flag:
973 self.save_flag.set()
975 # update the auto _i variables
976 self._iii = self._ii
977 self._ii = self._i
978 self._i = self._i00
979 self._i00 = source_raw
981 # hackish access to user namespace to create _i1,_i2... dynamically
982 new_i = "_i%s" % line_num
983 to_main = {"_i": self._i, "_ii": self._ii, "_iii": self._iii, new_i: self._i00}
985 if self.shell is not None:
986 self.shell.push(to_main, interactive=False)
988 def store_output(self, line_num: int) -> None:
989 """If database output logging is enabled, this saves all the
990 outputs from the indicated prompt number to the database. It's
991 called by run_cell after code has been executed.
993 Parameters
994 ----------
995 line_num : int
996 The line number from which to save outputs
997 """
998 if (not self.db_log_output) or (line_num not in self.output_hist_reprs):
999 return
1000 lnum: int = line_num
1001 output = self.output_hist_reprs[line_num]
1003 with self.db_output_cache_lock:
1004 self.db_output_cache.append((line_num, output))
1005 if self.db_cache_size <= 1 and self.save_flag is not None:
1006 self.save_flag.set()
1008 def _writeout_input_cache(self, conn: sqlite3.Connection) -> None:
1009 with conn:
1010 for line in self.db_input_cache:
1011 conn.execute(
1012 "INSERT INTO history VALUES (?, ?, ?, ?)",
1013 (self.session_number,) + line,
1014 )
1016 def _writeout_output_cache(self, conn: sqlite3.Connection) -> None:
1017 with conn:
1018 for line in self.db_output_cache:
1019 conn.execute(
1020 "INSERT INTO output_history VALUES (?, ?, ?)",
1021 (self.session_number,) + line,
1022 )
1024 @only_when_enabled
1025 def writeout_cache(self, conn: Optional[sqlite3.Connection] = None) -> None:
1026 """Write any entries in the cache to the database."""
1027 if conn is None:
1028 conn = self.db
1030 with self.db_input_cache_lock:
1031 try:
1032 self._writeout_input_cache(conn)
1033 except sqlite3.IntegrityError:
1034 self.new_session(conn)
1035 print(
1036 "ERROR! Session/line number was not unique in",
1037 "database. History logging moved to new session",
1038 self.session_number,
1039 )
1040 try:
1041 # Try writing to the new session. If this fails, don't
1042 # recurse
1043 self._writeout_input_cache(conn)
1044 except sqlite3.IntegrityError:
1045 pass
1046 finally:
1047 self.db_input_cache = []
1049 with self.db_output_cache_lock:
1050 try:
1051 self._writeout_output_cache(conn)
1052 except sqlite3.IntegrityError:
1053 print(
1054 "!! Session/line number for output was not unique",
1055 "in database. Output will not be stored.",
1056 )
1057 finally:
1058 self.db_output_cache = []
1061from typing import Callable, Iterator
1062from weakref import ReferenceType
1065@contextmanager
1066def hold(ref: ReferenceType[HistoryManager]) -> Iterator[ReferenceType[HistoryManager]]:
1067 """
1068 Context manger that hold a reference to a weak ref to make sure it
1069 is not GC'd during it's context.
1070 """
1071 r = ref()
1072 yield ref
1073 del r
1076class HistorySavingThread(threading.Thread):
1077 """This thread takes care of writing history to the database, so that
1078 the UI isn't held up while that happens.
1080 It waits for the HistoryManager's save_flag to be set, then writes out
1081 the history cache. The main thread is responsible for setting the flag when
1082 the cache size reaches a defined threshold."""
1084 save_flag: threading.Event
1085 daemon: bool = True
1086 _stop_now: bool = False
1087 enabled: bool = True
1088 history_manager: ref[HistoryManager]
1089 _stopped = False
1091 def __init__(self, history_manager: HistoryManager) -> None:
1092 super(HistorySavingThread, self).__init__(name="IPythonHistorySavingThread")
1093 self.history_manager = ref(history_manager)
1094 self.enabled = history_manager.enabled
1095 self.save_flag = threading.Event()
1097 @only_when_enabled
1098 def run(self) -> None:
1099 atexit.register(self.stop)
1100 # We need a separate db connection per thread:
1101 try:
1102 hm: ReferenceType[HistoryManager]
1103 with hold(self.history_manager) as hm:
1104 if hm() is not None:
1105 self.db = sqlite3.connect(
1106 str(hm().hist_file), # type: ignore [union-attr]
1107 **hm().connection_options, # type: ignore [union-attr]
1108 )
1109 while True:
1110 self.save_flag.wait()
1111 with hold(self.history_manager) as hm:
1112 if hm() is None:
1113 self._stop_now = True
1114 if self._stop_now:
1115 self.db.close()
1116 return
1117 self.save_flag.clear()
1118 if hm() is not None:
1119 hm().writeout_cache(self.db) # type: ignore [union-attr]
1121 except Exception as e:
1122 print(
1123 (
1124 "The history saving thread hit an unexpected error (%s)."
1125 "History will not be written to the database."
1126 )
1127 % repr(e)
1128 )
1129 finally:
1130 atexit.unregister(self.stop)
1132 def stop(self) -> None:
1133 """This can be called from the main thread to safely stop this thread.
1135 Note that it does not attempt to write out remaining history before
1136 exiting. That should be done by calling the HistoryManager's
1137 end_session method."""
1138 if self._stopped:
1139 return
1140 self._stop_now = True
1142 self.save_flag.set()
1143 self._stopped = True
1144 if self != threading.current_thread():
1145 self.join()
1147 def __del__(self) -> None:
1148 self.stop()
1151# To match, e.g. ~5/8-~2/3
1152range_re = re.compile(
1153 r"""
1154((?P<startsess>~?\d+)/)?
1155(?P<start>\d+)?
1156((?P<sep>[\-:])
1157 ((?P<endsess>~?\d+)/)?
1158 (?P<end>\d+))?
1159$""",
1160 re.VERBOSE,
1161)
1164def extract_hist_ranges(ranges_str: str) -> Iterable[tuple[int, int, Optional[int]]]:
1165 """Turn a string of history ranges into 3-tuples of (session, start, stop).
1167 Empty string results in a `[(0, 1, None)]`, i.e. "everything from current
1168 session".
1170 Examples
1171 --------
1172 >>> list(extract_hist_ranges("~8/5-~7/4 2"))
1173 [(-8, 5, None), (-7, 1, 5), (0, 2, 3)]
1174 """
1175 if ranges_str == "":
1176 yield (0, 1, None) # Everything from current session
1177 return
1179 for range_str in ranges_str.split():
1180 rmatch = range_re.match(range_str)
1181 if not rmatch:
1182 continue
1183 start = rmatch.group("start")
1184 if start:
1185 start = int(start)
1186 end = rmatch.group("end")
1187 # If no end specified, get (a, a + 1)
1188 end = int(end) if end else start + 1
1189 else: # start not specified
1190 if not rmatch.group("startsess"): # no startsess
1191 continue
1192 start = 1
1193 end = None # provide the entire session hist
1195 if rmatch.group("sep") == "-": # 1-3 == 1:4 --> [1, 2, 3]
1196 assert end is not None
1197 end += 1
1198 startsess = rmatch.group("startsess") or "0"
1199 endsess = rmatch.group("endsess") or startsess
1200 startsess = int(startsess.replace("~", "-"))
1201 endsess = int(endsess.replace("~", "-"))
1202 assert endsess >= startsess, "start session must be earlier than end session"
1204 if endsess == startsess:
1205 yield (startsess, start, end)
1206 continue
1207 # Multiple sessions in one range:
1208 yield (startsess, start, None)
1209 for sess in range(startsess + 1, endsess):
1210 yield (sess, 1, None)
1211 yield (endsess, 1, end)
1214def _format_lineno(session: int, line: int) -> str:
1215 """Helper function to format line numbers properly."""
1216 if session == 0:
1217 return str(line)
1218 return "%s#%s" % (session, line)