Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/history.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""History related magics and functionality"""
3from __future__ import annotations
5# Copyright (c) IPython Development Team.
6# Distributed under the terms of the Modified BSD License.
9import atexit
10import datetime
11import os
12import re
15import threading
16from pathlib import Path
18from collections import defaultdict
19from contextlib import contextmanager
20from dataclasses import dataclass
21from decorator import decorator
22from traitlets import (
23 Any,
24 Bool,
25 Dict,
26 Instance,
27 Integer,
28 List,
29 TraitError,
30 Unicode,
31 Union,
32 default,
33 observe,
34)
35from traitlets.config.configurable import LoggingConfigurable
37from IPython.paths import locate_profile
38from IPython.utils.decorators import undoc
39from typing import Tuple, Optional, TYPE_CHECKING
40from collections.abc import Iterable
41import typing
42from warnings import warn
43from weakref import ref, WeakSet
45if TYPE_CHECKING:
46 from IPython.core.interactiveshell import InteractiveShell
47 from IPython.config.Configuration import Configuration
49try:
50 from sqlite3 import DatabaseError, OperationalError
51 import sqlite3
53 sqlite3.register_converter(
54 "timestamp", lambda val: datetime.datetime.fromisoformat(val.decode())
55 )
57 sqlite3_found = True
58except ModuleNotFoundError:
59 sqlite3_found = False
61 class DatabaseError(Exception): # type: ignore [no-redef]
62 pass
64 class OperationalError(Exception): # type: ignore [no-redef]
65 pass
68InOrInOut = typing.Union[str, tuple[str, Optional[str]]]
70# -----------------------------------------------------------------------------
71# Classes and functions
72# -----------------------------------------------------------------------------
75@undoc
76class DummyDB:
77 """Dummy DB that will act as a black hole for history.
79 Only used in the absence of sqlite"""
81 def execute(*args: typing.Any, **kwargs: typing.Any) -> list:
82 return []
84 def commit(self, *args, **kwargs): # type: ignore [no-untyped-def]
85 pass
87 def __enter__(self, *args, **kwargs): # type: ignore [no-untyped-def]
88 pass
90 def __exit__(self, *args, **kwargs): # type: ignore [no-untyped-def]
91 pass
94@decorator
95def only_when_enabled(f, self, *a, **kw): # type: ignore [no-untyped-def]
96 """Decorator: return an empty list in the absence of sqlite."""
97 if not self.enabled:
98 return []
99 else:
100 return f(self, *a, **kw)
103# use 16kB as threshold for whether a corrupt history db should be saved
104# that should be at least 100 entries or so
105_SAVE_DB_SIZE = 16384
108@decorator
109def catch_corrupt_db(f, self, *a, **kw): # type: ignore [no-untyped-def]
110 """A decorator which wraps HistoryAccessor method calls to catch errors from
111 a corrupt SQLite database, move the old database out of the way, and create
112 a new one.
114 We avoid clobbering larger databases because this may be triggered due to filesystem issues,
115 not just a corrupt file.
116 """
117 try:
118 return f(self, *a, **kw)
119 except (DatabaseError, OperationalError) as e:
120 self._corrupt_db_counter += 1
121 self.log.error("Failed to open SQLite history %s (%s).", self.hist_file, e)
122 if self.hist_file != ":memory:":
123 if self._corrupt_db_counter > self._corrupt_db_limit:
124 self.hist_file = ":memory:"
125 self.log.error(
126 "Failed to load history too many times, history will not be saved."
127 )
128 elif self.hist_file.is_file():
129 # move the file out of the way
130 base = str(self.hist_file.parent / self.hist_file.stem)
131 ext = self.hist_file.suffix
132 size = self.hist_file.stat().st_size
133 if size >= _SAVE_DB_SIZE:
134 # if there's significant content, avoid clobbering
135 now = (
136 datetime.datetime.now(datetime.timezone.utc)
137 .isoformat()
138 .replace(":", ".")
139 )
140 newpath = base + "-corrupt-" + now + ext
141 # don't clobber previous corrupt backups
142 for i in range(100):
143 if not Path(newpath).exists():
144 break
145 else:
146 newpath = base + "-corrupt-" + now + ("-%i" % i) + ext
147 else:
148 # not much content, possibly empty; don't worry about clobbering
149 # maybe we should just delete it?
150 newpath = base + "-corrupt" + ext
151 self.hist_file.rename(newpath)
152 self.log.error(
153 "History file was moved to %s and a new file created.", newpath
154 )
155 self.init_db()
156 return []
157 else:
158 # Failed with :memory:, something serious is wrong
159 raise
162class HistoryAccessorBase(LoggingConfigurable):
163 """An abstract class for History Accessors"""
165 def get_tail(
166 self,
167 n: int = 10,
168 raw: bool = True,
169 output: bool = False,
170 include_latest: bool = False,
171 ) -> Iterable[tuple[int, int, InOrInOut]]:
172 raise NotImplementedError
174 def search(
175 self,
176 pattern: str = "*",
177 raw: bool = True,
178 search_raw: bool = True,
179 output: bool = False,
180 n: Optional[int] = None,
181 unique: bool = False,
182 ) -> Iterable[tuple[int, int, InOrInOut]]:
183 raise NotImplementedError
185 def get_range(
186 self,
187 session: int,
188 start: int = 1,
189 stop: Optional[int] = None,
190 raw: bool = True,
191 output: bool = False,
192 ) -> Iterable[tuple[int, int, InOrInOut]]:
193 raise NotImplementedError
195 def get_range_by_str(
196 self, rangestr: str, raw: bool = True, output: bool = False
197 ) -> Iterable[tuple[int, int, InOrInOut]]:
198 raise NotImplementedError
201class HistoryAccessor(HistoryAccessorBase):
202 """Access the history database without adding to it.
204 This is intended for use by standalone history tools. IPython shells use
205 HistoryManager, below, which is a subclass of this."""
207 # counter for init_db retries, so we don't keep trying over and over
208 _corrupt_db_counter = 0
209 # after two failures, fallback on :memory:
210 _corrupt_db_limit = 2
212 # String holding the path to the history file
213 hist_file = Union(
214 [Instance(Path), Unicode()],
215 help="""Path to file to use for SQLite history database.
217 By default, IPython will put the history database in the IPython
218 profile directory. If you would rather share one history among
219 profiles, you can set this value in each, so that they are consistent.
221 Due to an issue with fcntl, SQLite is known to misbehave on some NFS
222 mounts. If you see IPython hanging, try setting this to something on a
223 local disk, e.g::
225 ipython --HistoryManager.hist_file=/tmp/ipython_hist.sqlite
227 you can also use the specific value `:memory:` (including the colon
228 at both end but not the back ticks), to avoid creating an history file.
230 """,
231 ).tag(config=True)
233 enabled = Bool(
234 sqlite3_found,
235 help="""enable the SQLite history
237 set enabled=False to disable the SQLite history,
238 in which case there will be no stored history, no SQLite connection,
239 and no background saving thread. This may be necessary in some
240 threaded environments where IPython is embedded.
241 """,
242 ).tag(config=True)
244 connection_options = Dict(
245 help="""Options for configuring the SQLite connection
247 These options are passed as keyword args to sqlite3.connect
248 when establishing database connections.
249 """
250 ).tag(config=True)
252 @default("connection_options")
253 def _default_connection_options(self) -> dict[str, bool]:
254 return dict(check_same_thread=False)
256 # The SQLite database
257 db = Any()
259 @observe("db")
260 @only_when_enabled
261 def _db_changed(self, change): # type: ignore [no-untyped-def]
262 """validate the db, since it can be an Instance of two different types"""
263 new = change["new"]
264 connection_types = (DummyDB, sqlite3.Connection)
265 if not isinstance(new, connection_types):
266 msg = "%s.db must be sqlite3 Connection or DummyDB, not %r" % (
267 self.__class__.__name__,
268 new,
269 )
270 raise TraitError(msg)
272 def __init__(
273 self, profile: str = "default", hist_file: str = "", **traits: typing.Any
274 ) -> None:
275 """Create a new history accessor.
277 Parameters
278 ----------
279 profile : str
280 The name of the profile from which to open history.
281 hist_file : str
282 Path to an SQLite history database stored by IPython. If specified,
283 hist_file overrides profile.
284 config : :class:`~traitlets.config.loader.Config`
285 Config object. hist_file can also be set through this.
286 """
287 super(HistoryAccessor, self).__init__(**traits)
288 # defer setting hist_file from kwarg until after init,
289 # otherwise the default kwarg value would clobber any value
290 # set by config
291 if hist_file:
292 self.hist_file = hist_file
294 try:
295 self.hist_file
296 except TraitError:
297 # No one has set the hist_file, yet.
298 self.hist_file = self._get_hist_file_name(profile)
300 self.init_db()
302 def _get_hist_file_name(self, profile: str = "default") -> Path:
303 """Find the history file for the given profile name.
305 This is overridden by the HistoryManager subclass, to use the shell's
306 active profile.
308 Parameters
309 ----------
310 profile : str
311 The name of a profile which has a history file.
312 """
313 return Path(locate_profile(profile)) / "history.sqlite"
315 @catch_corrupt_db
316 def init_db(self) -> None:
317 """Connect to the database, and create tables if necessary."""
318 if not self.enabled:
319 self.db = DummyDB()
320 return
322 # use detect_types so that timestamps return datetime objects
323 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
324 kwargs.update(self.connection_options)
325 self.db = sqlite3.connect(str(self.hist_file), **kwargs) # type: ignore [call-overload]
326 with self.db:
327 self.db.execute(
328 """CREATE TABLE IF NOT EXISTS sessions (session integer
329 primary key autoincrement, start timestamp,
330 end timestamp, num_cmds integer, remark text)"""
331 )
332 self.db.execute(
333 """CREATE TABLE IF NOT EXISTS history
334 (session integer, line integer, source text, source_raw text,
335 PRIMARY KEY (session, line))"""
336 )
337 # Output history is optional, but ensure the table's there so it can be
338 # enabled later.
339 self.db.execute(
340 """CREATE TABLE IF NOT EXISTS output_history
341 (session integer, line integer, output text,
342 PRIMARY KEY (session, line))"""
343 )
344 # success! reset corrupt db count
345 self._corrupt_db_counter = 0
347 def writeout_cache(self) -> None:
348 """Overridden by HistoryManager to dump the cache before certain
349 database lookups."""
350 pass
352 ## -------------------------------
353 ## Methods for retrieving history:
354 ## -------------------------------
355 def _run_sql(
356 self,
357 sql: str,
358 params: tuple,
359 raw: bool = True,
360 output: bool = False,
361 latest: bool = False,
362 ) -> Iterable[tuple[int, int, InOrInOut]]:
363 """Prepares and runs an SQL query for the history database.
365 Parameters
366 ----------
367 sql : str
368 Any filtering expressions to go after SELECT ... FROM ...
369 params : tuple
370 Parameters passed to the SQL query (to replace "?")
371 raw, output : bool
372 See :meth:`get_range`
373 latest : bool
374 Select rows with max (session, line)
376 Returns
377 -------
378 Tuples as :meth:`get_range`
379 """
380 toget = "source_raw" if raw else "source"
381 sqlfrom = "history"
382 if output:
383 sqlfrom = "history LEFT JOIN output_history USING (session, line)"
384 toget = "history.%s, output_history.output" % toget
385 if latest:
386 toget += ", MAX(session * 128 * 1024 + line)"
387 this_querry = "SELECT session, line, %s FROM %s " % (toget, sqlfrom) + sql
388 cur = self.db.execute(this_querry, params)
389 if latest:
390 cur = (row[:-1] for row in cur)
391 if output: # Regroup into 3-tuples, and parse JSON
392 return ((ses, lin, (inp, out)) for ses, lin, inp, out in cur)
393 return cur
395 @only_when_enabled
396 @catch_corrupt_db
397 def get_session_info(
398 self, session: int
399 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]:
400 """Get info about a session.
402 Parameters
403 ----------
404 session : int
405 Session number to retrieve.
407 Returns
408 -------
409 session_id : int
410 Session ID number
411 start : datetime
412 Timestamp for the start of the session.
413 end : datetime
414 Timestamp for the end of the session, or None if IPython crashed.
415 num_cmds : int
416 Number of commands run, or None if IPython crashed.
417 remark : str
418 A manually set description.
419 """
420 query = "SELECT * from sessions where session == ?"
421 return self.db.execute(query, (session,)).fetchone()
423 @catch_corrupt_db
424 def get_last_session_id(self) -> Optional[int]:
425 """Get the last session ID currently in the database.
427 Within IPython, this should be the same as the value stored in
428 :attr:`HistoryManager.session_number`.
429 """
430 for record in self.get_tail(n=1, include_latest=True):
431 return record[0]
432 return None
434 @catch_corrupt_db
435 def get_tail(
436 self,
437 n: int = 10,
438 raw: bool = True,
439 output: bool = False,
440 include_latest: bool = False,
441 ) -> Iterable[tuple[int, int, InOrInOut]]:
442 """Get the last n lines from the history database.
444 Parameters
445 ----------
446 n : int
447 The number of lines to get
448 raw, output : bool
449 See :meth:`get_range`
450 include_latest : bool
451 If False (default), n+1 lines are fetched, and the latest one
452 is discarded. This is intended to be used where the function
453 is called by a user command, which it should not return.
455 Returns
456 -------
457 Tuples as :meth:`get_range`
458 """
459 self.writeout_cache()
460 if not include_latest:
461 n += 1
462 cur = self._run_sql(
463 "ORDER BY session DESC, line DESC LIMIT ?", (n,), raw=raw, output=output
464 )
465 if not include_latest:
466 return reversed(list(cur)[1:])
467 return reversed(list(cur))
469 @catch_corrupt_db
470 def search(
471 self,
472 pattern: str = "*",
473 raw: bool = True,
474 search_raw: bool = True,
475 output: bool = False,
476 n: Optional[int] = None,
477 unique: bool = False,
478 ) -> Iterable[tuple[int, int, InOrInOut]]:
479 """Search the database using unix glob-style matching (wildcards
480 * and ?).
482 Parameters
483 ----------
484 pattern : str
485 The wildcarded pattern to match when searching
486 search_raw : bool
487 If True, search the raw input, otherwise, the parsed input
488 raw, output : bool
489 See :meth:`get_range`
490 n : None or int
491 If an integer is given, it defines the limit of
492 returned entries.
493 unique : bool
494 When it is true, return only unique entries.
496 Returns
497 -------
498 Tuples as :meth:`get_range`
499 """
500 tosearch = "source_raw" if search_raw else "source"
501 if output:
502 tosearch = "history." + tosearch
503 self.writeout_cache()
504 sqlform = "WHERE %s GLOB ?" % tosearch
505 params: tuple[typing.Any, ...] = (pattern,)
506 if unique:
507 sqlform += " GROUP BY {0}".format(tosearch)
508 if n is not None:
509 sqlform += " ORDER BY session DESC, line DESC LIMIT ?"
510 params += (n,)
511 elif unique:
512 sqlform += " ORDER BY session, line"
513 cur = self._run_sql(sqlform, params, raw=raw, output=output, latest=unique)
514 if n is not None:
515 return reversed(list(cur))
516 return cur
518 @catch_corrupt_db
519 def get_range(
520 self,
521 session: int,
522 start: int = 1,
523 stop: Optional[int] = None,
524 raw: bool = True,
525 output: bool = False,
526 ) -> Iterable[tuple[int, int, InOrInOut]]:
527 """Retrieve input by session.
529 Parameters
530 ----------
531 session : int
532 Session number to retrieve.
533 start : int
534 First line to retrieve.
535 stop : int
536 End of line range (excluded from output itself). If None, retrieve
537 to the end of the session.
538 raw : bool
539 If True, return untranslated input
540 output : bool
541 If True, attempt to include output. This will be 'real' Python
542 objects for the current session, or text reprs from previous
543 sessions if db_log_output was enabled at the time. Where no output
544 is found, None is used.
546 Returns
547 -------
548 entries
549 An iterator over the desired lines. Each line is a 3-tuple, either
550 (session, line, input) if output is False, or
551 (session, line, (input, output)) if output is True.
552 """
553 params: tuple[typing.Any, ...]
554 if stop:
555 lineclause = "line >= ? AND line < ?"
556 params = (session, start, stop)
557 else:
558 lineclause = "line>=?"
559 params = (session, start)
561 return self._run_sql(
562 "WHERE session==? AND %s" % lineclause, params, raw=raw, output=output
563 )
565 def get_range_by_str(
566 self, rangestr: str, raw: bool = True, output: bool = False
567 ) -> Iterable[tuple[int, int, InOrInOut]]:
568 """Get lines of history from a string of ranges, as used by magic
569 commands %hist, %save, %macro, etc.
571 Parameters
572 ----------
573 rangestr : str
574 A string specifying ranges, e.g. "5 ~2/1-4". If empty string is used,
575 this will return everything from current session's history.
577 See the documentation of :func:`%history` for the full details.
579 raw, output : bool
580 As :meth:`get_range`
582 Returns
583 -------
584 Tuples as :meth:`get_range`
585 """
586 for sess, s, e in extract_hist_ranges(rangestr):
587 yield from self.get_range(sess, s, e, raw=raw, output=output)
590@dataclass
591class HistoryOutput:
592 output_type: typing.Literal[
593 "out_stream", "err_stream", "display_data", "execute_result"
594 ]
595 bundle: typing.Dict[str, str | list[str]]
598class HistoryManager(HistoryAccessor):
599 """A class to organize all history-related functionality in one place."""
601 # Public interface
603 # An instance of the IPython shell we are attached to
604 shell = Instance(
605 "IPython.core.interactiveshell.InteractiveShellABC", allow_none=False
606 )
607 # Lists to hold processed and raw history. These start with a blank entry
608 # so that we can index them starting from 1
609 input_hist_parsed = List([""])
610 input_hist_raw = List([""])
611 # A list of directories visited during session
612 dir_hist: List = List()
614 @default("dir_hist")
615 def _dir_hist_default(self) -> list[Path]:
616 try:
617 return [Path.cwd()]
618 except OSError:
619 return []
621 # A dict of output history, keyed with ints from the shell's
622 # execution count.
623 output_hist = Dict()
624 # The text/plain repr of outputs.
625 output_hist_reprs: typing.Dict[int, str] = Dict() # type: ignore [assignment]
626 # Maps execution_count to MIME bundles
627 outputs: typing.Dict[int, typing.List[HistoryOutput]] = defaultdict(list)
628 # Maps execution_count to exception tracebacks
629 exceptions: typing.Dict[int, typing.Dict[str, Any]] = Dict() # type: ignore [assignment]
631 # The number of the current session in the history database
632 session_number: int = Integer() # type: ignore [assignment]
634 db_log_output = Bool(
635 False, help="Should the history database include output? (default: no)"
636 ).tag(config=True)
637 db_cache_size = Integer(
638 0,
639 help="Write to database every x commands (higher values save disk access & power).\n"
640 "Values of 1 or less effectively disable caching.",
641 ).tag(config=True)
642 # The input and output caches
643 db_input_cache: List[tuple[int, str, str]] = List()
644 db_output_cache: List[tuple[int, str]] = List()
646 # History saving in separate thread
647 save_thread = Instance("IPython.core.history.HistorySavingThread", allow_none=True)
649 @property
650 def save_flag(self) -> threading.Event | None:
651 if self.save_thread is not None:
652 return self.save_thread.save_flag
653 return None
655 # Private interface
656 # Variables used to store the three last inputs from the user. On each new
657 # history update, we populate the user's namespace with these, shifted as
658 # necessary.
659 _i00 = Unicode("")
660 _i = Unicode("")
661 _ii = Unicode("")
662 _iii = Unicode("")
664 # A regex matching all forms of the exit command, so that we don't store
665 # them in the history (it's annoying to rewind the first entry and land on
666 # an exit call).
667 _exit_re = re.compile(r"(exit|quit)(\s*\(.*\))?$")
669 _instances: WeakSet[HistoryManager] = WeakSet()
670 _max_inst: int | float = float("inf")
672 def __init__(
673 self,
674 shell: InteractiveShell,
675 config: Optional[Configuration] = None,
676 **traits: typing.Any,
677 ):
678 """Create a new history manager associated with a shell instance."""
679 super().__init__(shell=shell, config=config, **traits)
680 self.db_input_cache_lock = threading.Lock()
681 self.db_output_cache_lock = threading.Lock()
683 try:
684 self.new_session()
685 except OperationalError:
686 self.log.error(
687 "Failed to create history session in %s. History will not be saved.",
688 self.hist_file,
689 exc_info=True,
690 )
691 self.hist_file = ":memory:"
693 self.using_thread = False
694 if self.enabled and self.hist_file != ":memory:":
695 self.save_thread = HistorySavingThread(self)
696 try:
697 self.save_thread.start()
698 except RuntimeError:
699 self.log.error(
700 "Failed to start history saving thread. History will not be saved.",
701 exc_info=True,
702 )
703 self.hist_file = ":memory:"
704 else:
705 self.using_thread = True
706 self._instances.add(self)
707 assert len(HistoryManager._instances) <= HistoryManager._max_inst, (
708 len(HistoryManager._instances),
709 HistoryManager._max_inst,
710 )
712 def __del__(self) -> None:
713 if self.save_thread is not None:
714 self.save_thread.stop()
716 @classmethod
717 def _stop_thread(cls) -> None:
718 # Used before forking so the thread isn't running at fork
719 for inst in cls._instances:
720 if inst.save_thread is not None:
721 inst.save_thread.stop()
722 inst.save_thread = None
724 def _restart_thread_if_stopped(self) -> None:
725 # Start the thread again after it was stopped for forking
726 if self.save_thread is None and self.using_thread:
727 self.save_thread = HistorySavingThread(self)
728 self.save_thread.start()
730 def _get_hist_file_name(self, profile: Optional[str] = None) -> Path:
731 """Get default history file name based on the Shell's profile.
733 The profile parameter is ignored, but must exist for compatibility with
734 the parent class."""
735 profile_dir = self.shell.profile_dir.location
736 return Path(profile_dir) / "history.sqlite"
738 @only_when_enabled
739 def new_session(self, conn: Optional[sqlite3.Connection] = None) -> None:
740 """Get a new session number."""
741 if conn is None:
742 conn = self.db
744 with conn:
745 cur = conn.execute(
746 """INSERT INTO sessions VALUES (NULL, ?, NULL,
747 NULL, '') """,
748 (datetime.datetime.now().isoformat(" "),),
749 )
750 assert isinstance(cur.lastrowid, int)
751 self.session_number = cur.lastrowid
753 def end_session(self) -> None:
754 """Close the database session, filling in the end time and line count."""
755 self.writeout_cache()
756 with self.db:
757 self.db.execute(
758 """UPDATE sessions SET end=?, num_cmds=? WHERE
759 session==?""",
760 (
761 datetime.datetime.now(datetime.timezone.utc).isoformat(" "),
762 len(self.input_hist_parsed) - 1,
763 self.session_number,
764 ),
765 )
766 self.session_number = 0
768 def name_session(self, name: str) -> None:
769 """Give the current session a name in the history database."""
770 warn(
771 "name_session is deprecated in IPython 9.0 and will be removed in future versions",
772 DeprecationWarning,
773 stacklevel=2,
774 )
775 with self.db:
776 self.db.execute(
777 "UPDATE sessions SET remark=? WHERE session==?",
778 (name, self.session_number),
779 )
781 def reset(self, new_session: bool = True) -> None:
782 """Clear the session history, releasing all object references, and
783 optionally open a new session."""
784 self.output_hist.clear()
785 self.outputs.clear()
786 self.exceptions.clear()
788 # The directory history can't be completely empty
789 self.dir_hist[:] = [Path.cwd()]
791 if new_session:
792 if self.session_number:
793 self.end_session()
794 self.input_hist_parsed[:] = [""]
795 self.input_hist_raw[:] = [""]
796 self.new_session()
798 # ------------------------------
799 # Methods for retrieving history
800 # ------------------------------
801 def get_session_info(
802 self, session: int = 0
803 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]:
804 """Get info about a session.
806 Parameters
807 ----------
808 session : int
809 Session number to retrieve. The current session is 0, and negative
810 numbers count back from current session, so -1 is the previous session.
812 Returns
813 -------
814 session_id : int
815 Session ID number
816 start : datetime
817 Timestamp for the start of the session.
818 end : datetime
819 Timestamp for the end of the session, or None if IPython crashed.
820 num_cmds : int
821 Number of commands run, or None if IPython crashed.
822 remark : str
823 A manually set description.
824 """
825 if session <= 0:
826 session += self.session_number
828 return super(HistoryManager, self).get_session_info(session=session)
830 @catch_corrupt_db
831 def get_tail(
832 self,
833 n: int = 10,
834 raw: bool = True,
835 output: bool = False,
836 include_latest: bool = False,
837 ) -> Iterable[tuple[int, int, InOrInOut]]:
838 """Get the last n lines from the history database.
840 Most recent entry last.
842 Completion will be reordered so that that the last ones are when
843 possible from current session.
845 Parameters
846 ----------
847 n : int
848 The number of lines to get
849 raw, output : bool
850 See :meth:`get_range`
851 include_latest : bool
852 If False (default), n+1 lines are fetched, and the latest one
853 is discarded. This is intended to be used where the function
854 is called by a user command, which it should not return.
856 Returns
857 -------
858 Tuples as :meth:`get_range`
859 """
860 self.writeout_cache()
861 if not include_latest:
862 n += 1
863 # cursor/line/entry
864 this_cur = list(
865 self._run_sql(
866 "WHERE session == ? ORDER BY line DESC LIMIT ? ",
867 (self.session_number, n),
868 raw=raw,
869 output=output,
870 )
871 )
872 other_cur = list(
873 self._run_sql(
874 "WHERE session != ? ORDER BY session DESC, line DESC LIMIT ?",
875 (self.session_number, n),
876 raw=raw,
877 output=output,
878 )
879 )
881 everything: list[tuple[int, int, InOrInOut]] = this_cur + other_cur
883 everything = everything[:n]
885 if not include_latest:
886 return list(everything)[:0:-1]
887 return list(everything)[::-1]
889 def _get_range_session(
890 self,
891 start: int = 1,
892 stop: Optional[int] = None,
893 raw: bool = True,
894 output: bool = False,
895 ) -> Iterable[tuple[int, int, InOrInOut]]:
896 """Get input and output history from the current session. Called by
897 get_range, and takes similar parameters."""
898 input_hist = self.input_hist_raw if raw else self.input_hist_parsed
900 n = len(input_hist)
901 if start < 0:
902 start += n
903 if not stop or (stop > n):
904 stop = n
905 elif stop < 0:
906 stop += n
907 line: InOrInOut
908 for i in range(start, stop):
909 if output:
910 line = (input_hist[i], self.output_hist_reprs.get(i))
911 else:
912 line = input_hist[i]
913 yield (0, i, line)
915 def get_range(
916 self,
917 session: int = 0,
918 start: int = 1,
919 stop: Optional[int] = None,
920 raw: bool = True,
921 output: bool = False,
922 ) -> Iterable[tuple[int, int, InOrInOut]]:
923 """Retrieve input by session.
925 Parameters
926 ----------
927 session : int
928 Session number to retrieve. The current session is 0, and negative
929 numbers count back from current session, so -1 is previous session.
930 start : int
931 First line to retrieve.
932 stop : int
933 End of line range (excluded from output itself). If None, retrieve
934 to the end of the session.
935 raw : bool
936 If True, return untranslated input
937 output : bool
938 If True, attempt to include output. This will be 'real' Python
939 objects for the current session, or text reprs from previous
940 sessions if db_log_output was enabled at the time. Where no output
941 is found, None is used.
943 Returns
944 -------
945 entries
946 An iterator over the desired lines. Each line is a 3-tuple, either
947 (session, line, input) if output is False, or
948 (session, line, (input, output)) if output is True.
949 """
950 if session <= 0:
951 session += self.session_number
952 if session == self.session_number: # Current session
953 return self._get_range_session(start, stop, raw, output)
954 return super(HistoryManager, self).get_range(session, start, stop, raw, output)
956 ## ----------------------------
957 ## Methods for storing history:
958 ## ----------------------------
959 def store_inputs(
960 self, line_num: int, source: str, source_raw: Optional[str] = None
961 ) -> None:
962 """Store source and raw input in history and create input cache
963 variables ``_i*``.
965 Parameters
966 ----------
967 line_num : int
968 The prompt number of this input.
969 source : str
970 Python input.
971 source_raw : str, optional
972 If given, this is the raw input without any IPython transformations
973 applied to it. If not given, ``source`` is used.
974 """
975 if source_raw is None:
976 source_raw = source
977 source = source.rstrip("\n")
978 source_raw = source_raw.rstrip("\n")
980 # do not store exit/quit commands
981 if self._exit_re.match(source_raw.strip()):
982 return
984 self.input_hist_parsed.append(source)
985 self.input_hist_raw.append(source_raw)
987 with self.db_input_cache_lock:
988 self.db_input_cache.append((line_num, source, source_raw))
989 # Trigger to flush cache and write to DB.
990 if len(self.db_input_cache) >= self.db_cache_size:
991 if self.using_thread:
992 self._restart_thread_if_stopped()
993 if self.save_flag is not None:
994 self.save_flag.set()
996 # update the auto _i variables
997 self._iii = self._ii
998 self._ii = self._i
999 self._i = self._i00
1000 self._i00 = source_raw
1002 # hackish access to user namespace to create _i1,_i2... dynamically
1003 new_i = "_i%s" % line_num
1004 to_main = {"_i": self._i, "_ii": self._ii, "_iii": self._iii, new_i: self._i00}
1006 if self.shell is not None:
1007 self.shell.push(to_main, interactive=False)
1009 def store_output(self, line_num: int) -> None:
1010 """If database output logging is enabled, this saves all the
1011 outputs from the indicated prompt number to the database. It's
1012 called by run_cell after code has been executed.
1014 Parameters
1015 ----------
1016 line_num : int
1017 The line number from which to save outputs
1018 """
1019 if (not self.db_log_output) or (line_num not in self.output_hist_reprs):
1020 return
1021 lnum: int = line_num
1022 output = self.output_hist_reprs[line_num]
1024 with self.db_output_cache_lock:
1025 self.db_output_cache.append((line_num, output))
1026 if self.db_cache_size <= 1 and self.using_thread:
1027 self._restart_thread_if_stopped()
1028 if self.save_flag is not None:
1029 self.save_flag.set()
1031 def _writeout_input_cache(self, conn: sqlite3.Connection) -> None:
1032 with conn:
1033 for line in self.db_input_cache:
1034 conn.execute(
1035 "INSERT INTO history VALUES (?, ?, ?, ?)",
1036 (self.session_number,) + line,
1037 )
1039 def _writeout_output_cache(self, conn: sqlite3.Connection) -> None:
1040 with conn:
1041 for line in self.db_output_cache:
1042 conn.execute(
1043 "INSERT INTO output_history VALUES (?, ?, ?)",
1044 (self.session_number,) + line,
1045 )
1047 @only_when_enabled
1048 def writeout_cache(self, conn: Optional[sqlite3.Connection] = None) -> None:
1049 """Write any entries in the cache to the database."""
1050 if conn is None:
1051 conn = self.db
1053 with self.db_input_cache_lock:
1054 try:
1055 self._writeout_input_cache(conn)
1056 except sqlite3.IntegrityError:
1057 self.new_session(conn)
1058 print(
1059 "ERROR! Session/line number was not unique in",
1060 "database. History logging moved to new session",
1061 self.session_number,
1062 )
1063 try:
1064 # Try writing to the new session. If this fails, don't
1065 # recurse
1066 self._writeout_input_cache(conn)
1067 except sqlite3.IntegrityError:
1068 pass
1069 finally:
1070 self.db_input_cache = []
1072 with self.db_output_cache_lock:
1073 try:
1074 self._writeout_output_cache(conn)
1075 except sqlite3.IntegrityError:
1076 print(
1077 "!! Session/line number for output was not unique",
1078 "in database. Output will not be stored.",
1079 )
1080 finally:
1081 self.db_output_cache = []
1084if hasattr(os, "register_at_fork"):
1085 os.register_at_fork(before=HistoryManager._stop_thread)
1088from collections.abc import Callable, Iterator
1089from weakref import ReferenceType
1092@contextmanager
1093def hold(ref: ReferenceType[HistoryManager]) -> Iterator[ReferenceType[HistoryManager]]:
1094 """
1095 Context manger that hold a reference to a weak ref to make sure it
1096 is not GC'd during it's context.
1097 """
1098 r = ref()
1099 yield ref
1100 del r
1103class HistorySavingThread(threading.Thread):
1104 """This thread takes care of writing history to the database, so that
1105 the UI isn't held up while that happens.
1107 It waits for the HistoryManager's save_flag to be set, then writes out
1108 the history cache. The main thread is responsible for setting the flag when
1109 the cache size reaches a defined threshold."""
1111 save_flag: threading.Event
1112 daemon: bool = True
1113 _stop_now: bool = False
1114 enabled: bool = True
1115 history_manager: ref[HistoryManager]
1116 _stopped = False
1118 def __init__(self, history_manager: HistoryManager) -> None:
1119 super(HistorySavingThread, self).__init__(name="IPythonHistorySavingThread")
1120 self.history_manager = ref(history_manager)
1121 self.enabled = history_manager.enabled
1122 self.save_flag = threading.Event()
1124 @only_when_enabled
1125 def run(self) -> None:
1126 atexit.register(self.stop)
1127 # We need a separate db connection per thread:
1128 try:
1129 hm: ReferenceType[HistoryManager]
1130 with hold(self.history_manager) as hm:
1131 if hm() is not None:
1132 self.db = sqlite3.connect(
1133 str(hm().hist_file), # type: ignore [union-attr]
1134 **hm().connection_options, # type: ignore [union-attr]
1135 )
1136 while True:
1137 self.save_flag.wait()
1138 with hold(self.history_manager) as hm:
1139 if hm() is None:
1140 self._stop_now = True
1141 if self._stop_now:
1142 self.db.close()
1143 return
1144 self.save_flag.clear()
1145 if hm() is not None:
1146 hm().writeout_cache(self.db) # type: ignore [union-attr]
1148 except Exception as e:
1149 print(
1150 (
1151 "The history saving thread hit an unexpected error (%s)."
1152 "History will not be written to the database."
1153 )
1154 % repr(e)
1155 )
1156 finally:
1157 atexit.unregister(self.stop)
1159 def stop(self) -> None:
1160 """This can be called from the main thread to safely stop this thread.
1162 Note that it does not attempt to write out remaining history before
1163 exiting. That should be done by calling the HistoryManager's
1164 end_session method."""
1165 if self._stopped:
1166 return
1167 self._stop_now = True
1169 self.save_flag.set()
1170 self._stopped = True
1171 if self != threading.current_thread():
1172 self.join()
1174 def __del__(self) -> None:
1175 self.stop()
1178# To match, e.g. ~5/8-~2/3, or ~4 (without trailing slash for full session)
1179# Session numbers: ~N or N/
1180# Line numbers: N (just digits, no ~)
1181# Range syntax: 4-6 (with end) or 4- (without end, means "onward")
1182range_re = re.compile(
1183 r"""
1184((?P<startsess>(?:~?\d+/)))?
1185(?P<start>\d+)?
1186((?P<sep>[\-:])
1187 ((?P<endsess>(?:~?\d+/)))?
1188 (?P<end>\d*))?
1189$""",
1190 re.VERBOSE,
1191)
1194def extract_hist_ranges(ranges_str: str) -> Iterable[tuple[int, int, Optional[int]]]:
1195 """Turn a string of history ranges into 3-tuples of (session, start, stop).
1197 Empty string results in a `[(0, 1, None)]`, i.e. "everything from current
1198 session".
1200 Examples
1201 --------
1202 >>> list(extract_hist_ranges("~8/5-~7/4 2"))
1203 [(-8, 5, None), (-7, 1, 5), (0, 2, 3)]
1204 >>> list(extract_hist_ranges("~4/"))
1205 [(-4, 1, None)]
1206 >>> list(extract_hist_ranges("4-"))
1207 [(0, 4, None)]
1208 >>> list(extract_hist_ranges("~4/4-"))
1209 [(-4, 4, None)]
1210 """
1211 if ranges_str == "":
1212 yield (0, 1, None) # Everything from current session
1213 return
1215 for range_str in ranges_str.split():
1216 rmatch = range_re.match(range_str)
1217 if not rmatch:
1218 continue
1219 start = rmatch.group("start")
1220 sep = rmatch.group("sep")
1221 if start:
1222 start = int(start)
1223 end = rmatch.group("end")
1224 if sep == "-":
1225 end = (int(end) + 1) if end else None
1226 else:
1227 end = int(end) if end else start + 1
1228 else:
1229 if not rmatch.group("startsess"):
1230 continue
1231 start = 1
1232 end = None
1233 startsess = rmatch.group("startsess") or "0"
1234 endsess = rmatch.group("endsess") or startsess
1235 startsess = startsess.rstrip("/")
1236 endsess = endsess.rstrip("/")
1237 startsess = int(startsess.replace("~", "-"))
1238 endsess = int(endsess.replace("~", "-"))
1239 assert endsess >= startsess, "start session must be earlier than end session"
1241 if endsess == startsess:
1242 yield (startsess, start, end)
1243 continue
1244 # Multiple sessions in one range:
1245 yield (startsess, start, None)
1246 for sess in range(startsess + 1, endsess):
1247 yield (sess, 1, None)
1248 yield (endsess, 1, end)
1251def _format_lineno(session: int, line: int) -> str:
1252 """Helper function to format line numbers properly."""
1253 if session == 0:
1254 return str(line)
1255 return "%s#%s" % (session, line)