Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/history.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""History related magics and functionality"""
3from __future__ import annotations
5# Copyright (c) IPython Development Team.
6# Distributed under the terms of the Modified BSD License.
9import atexit
10import datetime
11import re
14import threading
15from pathlib import Path
17from collections import defaultdict
18from contextlib import contextmanager
19from dataclasses import dataclass
20from decorator import decorator
21from traitlets import (
22 Any,
23 Bool,
24 Dict,
25 Instance,
26 Integer,
27 List,
28 TraitError,
29 Unicode,
30 Union,
31 default,
32 observe,
33)
34from traitlets.config.configurable import LoggingConfigurable
36from IPython.paths import locate_profile
37from IPython.utils.decorators import undoc
38from typing import Tuple, Optional, TYPE_CHECKING
39from collections.abc import Iterable
40import typing
41from warnings import warn
42from weakref import ref, WeakSet
44if TYPE_CHECKING:
45 from IPython.core.interactiveshell import InteractiveShell
46 from IPython.config.Configuration import Configuration
48try:
49 from sqlite3 import DatabaseError, OperationalError
50 import sqlite3
52 sqlite3.register_converter(
53 "timestamp", lambda val: datetime.datetime.fromisoformat(val.decode())
54 )
56 sqlite3_found = True
57except ModuleNotFoundError:
58 sqlite3_found = False
60 class DatabaseError(Exception): # type: ignore [no-redef]
61 pass
63 class OperationalError(Exception): # type: ignore [no-redef]
64 pass
67InOrInOut = typing.Union[str, tuple[str, Optional[str]]]
69# -----------------------------------------------------------------------------
70# Classes and functions
71# -----------------------------------------------------------------------------
74@undoc
75class DummyDB:
76 """Dummy DB that will act as a black hole for history.
78 Only used in the absence of sqlite"""
80 def execute(*args: typing.Any, **kwargs: typing.Any) -> list:
81 return []
83 def commit(self, *args, **kwargs): # type: ignore [no-untyped-def]
84 pass
86 def __enter__(self, *args, **kwargs): # type: ignore [no-untyped-def]
87 pass
89 def __exit__(self, *args, **kwargs): # type: ignore [no-untyped-def]
90 pass
93@decorator
94def only_when_enabled(f, self, *a, **kw): # type: ignore [no-untyped-def]
95 """Decorator: return an empty list in the absence of sqlite."""
96 if not self.enabled:
97 return []
98 else:
99 return f(self, *a, **kw)
102# use 16kB as threshold for whether a corrupt history db should be saved
103# that should be at least 100 entries or so
104_SAVE_DB_SIZE = 16384
107@decorator
108def catch_corrupt_db(f, self, *a, **kw): # type: ignore [no-untyped-def]
109 """A decorator which wraps HistoryAccessor method calls to catch errors from
110 a corrupt SQLite database, move the old database out of the way, and create
111 a new one.
113 We avoid clobbering larger databases because this may be triggered due to filesystem issues,
114 not just a corrupt file.
115 """
116 try:
117 return f(self, *a, **kw)
118 except (DatabaseError, OperationalError) as e:
119 self._corrupt_db_counter += 1
120 self.log.error("Failed to open SQLite history %s (%s).", self.hist_file, e)
121 if self.hist_file != ":memory:":
122 if self._corrupt_db_counter > self._corrupt_db_limit:
123 self.hist_file = ":memory:"
124 self.log.error(
125 "Failed to load history too many times, history will not be saved."
126 )
127 elif self.hist_file.is_file():
128 # move the file out of the way
129 base = str(self.hist_file.parent / self.hist_file.stem)
130 ext = self.hist_file.suffix
131 size = self.hist_file.stat().st_size
132 if size >= _SAVE_DB_SIZE:
133 # if there's significant content, avoid clobbering
134 now = (
135 datetime.datetime.now(datetime.timezone.utc)
136 .isoformat()
137 .replace(":", ".")
138 )
139 newpath = base + "-corrupt-" + now + ext
140 # don't clobber previous corrupt backups
141 for i in range(100):
142 if not Path(newpath).exists():
143 break
144 else:
145 newpath = base + "-corrupt-" + now + ("-%i" % i) + ext
146 else:
147 # not much content, possibly empty; don't worry about clobbering
148 # maybe we should just delete it?
149 newpath = base + "-corrupt" + ext
150 self.hist_file.rename(newpath)
151 self.log.error(
152 "History file was moved to %s and a new file created.", newpath
153 )
154 self.init_db()
155 return []
156 else:
157 # Failed with :memory:, something serious is wrong
158 raise
161class HistoryAccessorBase(LoggingConfigurable):
162 """An abstract class for History Accessors"""
164 def get_tail(
165 self,
166 n: int = 10,
167 raw: bool = True,
168 output: bool = False,
169 include_latest: bool = False,
170 ) -> Iterable[tuple[int, int, InOrInOut]]:
171 raise NotImplementedError
173 def search(
174 self,
175 pattern: str = "*",
176 raw: bool = True,
177 search_raw: bool = True,
178 output: bool = False,
179 n: Optional[int] = None,
180 unique: bool = False,
181 ) -> Iterable[tuple[int, int, InOrInOut]]:
182 raise NotImplementedError
184 def get_range(
185 self,
186 session: int,
187 start: int = 1,
188 stop: Optional[int] = None,
189 raw: bool = True,
190 output: bool = False,
191 ) -> Iterable[tuple[int, int, InOrInOut]]:
192 raise NotImplementedError
194 def get_range_by_str(
195 self, rangestr: str, raw: bool = True, output: bool = False
196 ) -> Iterable[tuple[int, int, InOrInOut]]:
197 raise NotImplementedError
200class HistoryAccessor(HistoryAccessorBase):
201 """Access the history database without adding to it.
203 This is intended for use by standalone history tools. IPython shells use
204 HistoryManager, below, which is a subclass of this."""
206 # counter for init_db retries, so we don't keep trying over and over
207 _corrupt_db_counter = 0
208 # after two failures, fallback on :memory:
209 _corrupt_db_limit = 2
211 # String holding the path to the history file
212 hist_file = Union(
213 [Instance(Path), Unicode()],
214 help="""Path to file to use for SQLite history database.
216 By default, IPython will put the history database in the IPython
217 profile directory. If you would rather share one history among
218 profiles, you can set this value in each, so that they are consistent.
220 Due to an issue with fcntl, SQLite is known to misbehave on some NFS
221 mounts. If you see IPython hanging, try setting this to something on a
222 local disk, e.g::
224 ipython --HistoryManager.hist_file=/tmp/ipython_hist.sqlite
226 you can also use the specific value `:memory:` (including the colon
227 at both end but not the back ticks), to avoid creating an history file.
229 """,
230 ).tag(config=True)
232 enabled = Bool(
233 sqlite3_found,
234 help="""enable the SQLite history
236 set enabled=False to disable the SQLite history,
237 in which case there will be no stored history, no SQLite connection,
238 and no background saving thread. This may be necessary in some
239 threaded environments where IPython is embedded.
240 """,
241 ).tag(config=True)
243 connection_options = Dict(
244 help="""Options for configuring the SQLite connection
246 These options are passed as keyword args to sqlite3.connect
247 when establishing database connections.
248 """
249 ).tag(config=True)
251 @default("connection_options")
252 def _default_connection_options(self) -> dict[str, bool]:
253 return dict(check_same_thread=False)
255 # The SQLite database
256 db = Any()
258 @observe("db")
259 @only_when_enabled
260 def _db_changed(self, change): # type: ignore [no-untyped-def]
261 """validate the db, since it can be an Instance of two different types"""
262 new = change["new"]
263 connection_types = (DummyDB, sqlite3.Connection)
264 if not isinstance(new, connection_types):
265 msg = "%s.db must be sqlite3 Connection or DummyDB, not %r" % (
266 self.__class__.__name__,
267 new,
268 )
269 raise TraitError(msg)
271 def __init__(
272 self, profile: str = "default", hist_file: str = "", **traits: typing.Any
273 ) -> None:
274 """Create a new history accessor.
276 Parameters
277 ----------
278 profile : str
279 The name of the profile from which to open history.
280 hist_file : str
281 Path to an SQLite history database stored by IPython. If specified,
282 hist_file overrides profile.
283 config : :class:`~traitlets.config.loader.Config`
284 Config object. hist_file can also be set through this.
285 """
286 super(HistoryAccessor, self).__init__(**traits)
287 # defer setting hist_file from kwarg until after init,
288 # otherwise the default kwarg value would clobber any value
289 # set by config
290 if hist_file:
291 self.hist_file = hist_file
293 try:
294 self.hist_file
295 except TraitError:
296 # No one has set the hist_file, yet.
297 self.hist_file = self._get_hist_file_name(profile)
299 self.init_db()
301 def _get_hist_file_name(self, profile: str = "default") -> Path:
302 """Find the history file for the given profile name.
304 This is overridden by the HistoryManager subclass, to use the shell's
305 active profile.
307 Parameters
308 ----------
309 profile : str
310 The name of a profile which has a history file.
311 """
312 return Path(locate_profile(profile)) / "history.sqlite"
314 @catch_corrupt_db
315 def init_db(self) -> None:
316 """Connect to the database, and create tables if necessary."""
317 if not self.enabled:
318 self.db = DummyDB()
319 return
321 # use detect_types so that timestamps return datetime objects
322 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
323 kwargs.update(self.connection_options)
324 self.db = sqlite3.connect(str(self.hist_file), **kwargs) # type: ignore [call-overload]
325 with self.db:
326 self.db.execute(
327 """CREATE TABLE IF NOT EXISTS sessions (session integer
328 primary key autoincrement, start timestamp,
329 end timestamp, num_cmds integer, remark text)"""
330 )
331 self.db.execute(
332 """CREATE TABLE IF NOT EXISTS history
333 (session integer, line integer, source text, source_raw text,
334 PRIMARY KEY (session, line))"""
335 )
336 # Output history is optional, but ensure the table's there so it can be
337 # enabled later.
338 self.db.execute(
339 """CREATE TABLE IF NOT EXISTS output_history
340 (session integer, line integer, output text,
341 PRIMARY KEY (session, line))"""
342 )
343 # success! reset corrupt db count
344 self._corrupt_db_counter = 0
346 def writeout_cache(self) -> None:
347 """Overridden by HistoryManager to dump the cache before certain
348 database lookups."""
349 pass
351 ## -------------------------------
352 ## Methods for retrieving history:
353 ## -------------------------------
354 def _run_sql(
355 self,
356 sql: str,
357 params: tuple,
358 raw: bool = True,
359 output: bool = False,
360 latest: bool = False,
361 ) -> Iterable[tuple[int, int, InOrInOut]]:
362 """Prepares and runs an SQL query for the history database.
364 Parameters
365 ----------
366 sql : str
367 Any filtering expressions to go after SELECT ... FROM ...
368 params : tuple
369 Parameters passed to the SQL query (to replace "?")
370 raw, output : bool
371 See :meth:`get_range`
372 latest : bool
373 Select rows with max (session, line)
375 Returns
376 -------
377 Tuples as :meth:`get_range`
378 """
379 toget = "source_raw" if raw else "source"
380 sqlfrom = "history"
381 if output:
382 sqlfrom = "history LEFT JOIN output_history USING (session, line)"
383 toget = "history.%s, output_history.output" % toget
384 if latest:
385 toget += ", MAX(session * 128 * 1024 + line)"
386 this_querry = "SELECT session, line, %s FROM %s " % (toget, sqlfrom) + sql
387 cur = self.db.execute(this_querry, params)
388 if latest:
389 cur = (row[:-1] for row in cur)
390 if output: # Regroup into 3-tuples, and parse JSON
391 return ((ses, lin, (inp, out)) for ses, lin, inp, out in cur)
392 return cur
394 @only_when_enabled
395 @catch_corrupt_db
396 def get_session_info(
397 self, session: int
398 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]:
399 """Get info about a session.
401 Parameters
402 ----------
403 session : int
404 Session number to retrieve.
406 Returns
407 -------
408 session_id : int
409 Session ID number
410 start : datetime
411 Timestamp for the start of the session.
412 end : datetime
413 Timestamp for the end of the session, or None if IPython crashed.
414 num_cmds : int
415 Number of commands run, or None if IPython crashed.
416 remark : str
417 A manually set description.
418 """
419 query = "SELECT * from sessions where session == ?"
420 return self.db.execute(query, (session,)).fetchone()
422 @catch_corrupt_db
423 def get_last_session_id(self) -> Optional[int]:
424 """Get the last session ID currently in the database.
426 Within IPython, this should be the same as the value stored in
427 :attr:`HistoryManager.session_number`.
428 """
429 for record in self.get_tail(n=1, include_latest=True):
430 return record[0]
431 return None
433 @catch_corrupt_db
434 def get_tail(
435 self,
436 n: int = 10,
437 raw: bool = True,
438 output: bool = False,
439 include_latest: bool = False,
440 ) -> Iterable[tuple[int, int, InOrInOut]]:
441 """Get the last n lines from the history database.
443 Parameters
444 ----------
445 n : int
446 The number of lines to get
447 raw, output : bool
448 See :meth:`get_range`
449 include_latest : bool
450 If False (default), n+1 lines are fetched, and the latest one
451 is discarded. This is intended to be used where the function
452 is called by a user command, which it should not return.
454 Returns
455 -------
456 Tuples as :meth:`get_range`
457 """
458 self.writeout_cache()
459 if not include_latest:
460 n += 1
461 cur = self._run_sql(
462 "ORDER BY session DESC, line DESC LIMIT ?", (n,), raw=raw, output=output
463 )
464 if not include_latest:
465 return reversed(list(cur)[1:])
466 return reversed(list(cur))
468 @catch_corrupt_db
469 def search(
470 self,
471 pattern: str = "*",
472 raw: bool = True,
473 search_raw: bool = True,
474 output: bool = False,
475 n: Optional[int] = None,
476 unique: bool = False,
477 ) -> Iterable[tuple[int, int, InOrInOut]]:
478 """Search the database using unix glob-style matching (wildcards
479 * and ?).
481 Parameters
482 ----------
483 pattern : str
484 The wildcarded pattern to match when searching
485 search_raw : bool
486 If True, search the raw input, otherwise, the parsed input
487 raw, output : bool
488 See :meth:`get_range`
489 n : None or int
490 If an integer is given, it defines the limit of
491 returned entries.
492 unique : bool
493 When it is true, return only unique entries.
495 Returns
496 -------
497 Tuples as :meth:`get_range`
498 """
499 tosearch = "source_raw" if search_raw else "source"
500 if output:
501 tosearch = "history." + tosearch
502 self.writeout_cache()
503 sqlform = "WHERE %s GLOB ?" % tosearch
504 params: tuple[typing.Any, ...] = (pattern,)
505 if unique:
506 sqlform += " GROUP BY {0}".format(tosearch)
507 if n is not None:
508 sqlform += " ORDER BY session DESC, line DESC LIMIT ?"
509 params += (n,)
510 elif unique:
511 sqlform += " ORDER BY session, line"
512 cur = self._run_sql(sqlform, params, raw=raw, output=output, latest=unique)
513 if n is not None:
514 return reversed(list(cur))
515 return cur
517 @catch_corrupt_db
518 def get_range(
519 self,
520 session: int,
521 start: int = 1,
522 stop: Optional[int] = None,
523 raw: bool = True,
524 output: bool = False,
525 ) -> Iterable[tuple[int, int, InOrInOut]]:
526 """Retrieve input by session.
528 Parameters
529 ----------
530 session : int
531 Session number to retrieve.
532 start : int
533 First line to retrieve.
534 stop : int
535 End of line range (excluded from output itself). If None, retrieve
536 to the end of the session.
537 raw : bool
538 If True, return untranslated input
539 output : bool
540 If True, attempt to include output. This will be 'real' Python
541 objects for the current session, or text reprs from previous
542 sessions if db_log_output was enabled at the time. Where no output
543 is found, None is used.
545 Returns
546 -------
547 entries
548 An iterator over the desired lines. Each line is a 3-tuple, either
549 (session, line, input) if output is False, or
550 (session, line, (input, output)) if output is True.
551 """
552 params: tuple[typing.Any, ...]
553 if stop:
554 lineclause = "line >= ? AND line < ?"
555 params = (session, start, stop)
556 else:
557 lineclause = "line>=?"
558 params = (session, start)
560 return self._run_sql(
561 "WHERE session==? AND %s" % lineclause, params, raw=raw, output=output
562 )
564 def get_range_by_str(
565 self, rangestr: str, raw: bool = True, output: bool = False
566 ) -> Iterable[tuple[int, int, InOrInOut]]:
567 """Get lines of history from a string of ranges, as used by magic
568 commands %hist, %save, %macro, etc.
570 Parameters
571 ----------
572 rangestr : str
573 A string specifying ranges, e.g. "5 ~2/1-4". If empty string is used,
574 this will return everything from current session's history.
576 See the documentation of :func:`%history` for the full details.
578 raw, output : bool
579 As :meth:`get_range`
581 Returns
582 -------
583 Tuples as :meth:`get_range`
584 """
585 for sess, s, e in extract_hist_ranges(rangestr):
586 yield from self.get_range(sess, s, e, raw=raw, output=output)
589@dataclass
590class HistoryOutput:
591 output_type: typing.Literal[
592 "out_stream", "err_stream", "display_data", "execute_result"
593 ]
594 bundle: typing.Dict[str, str | list[str]]
597class HistoryManager(HistoryAccessor):
598 """A class to organize all history-related functionality in one place."""
600 # Public interface
602 # An instance of the IPython shell we are attached to
603 shell = Instance(
604 "IPython.core.interactiveshell.InteractiveShellABC", allow_none=False
605 )
606 # Lists to hold processed and raw history. These start with a blank entry
607 # so that we can index them starting from 1
608 input_hist_parsed = List([""])
609 input_hist_raw = List([""])
610 # A list of directories visited during session
611 dir_hist: List = List()
613 @default("dir_hist")
614 def _dir_hist_default(self) -> list[Path]:
615 try:
616 return [Path.cwd()]
617 except OSError:
618 return []
620 # A dict of output history, keyed with ints from the shell's
621 # execution count.
622 output_hist = Dict()
623 # The text/plain repr of outputs.
624 output_hist_reprs: typing.Dict[int, str] = Dict() # type: ignore [assignment]
625 # Maps execution_count to MIME bundles
626 outputs: typing.Dict[int, typing.List[HistoryOutput]] = defaultdict(list)
627 # Maps execution_count to exception tracebacks
628 exceptions: typing.Dict[int, typing.Dict[str, Any]] = Dict() # type: ignore [assignment]
630 # The number of the current session in the history database
631 session_number: int = Integer() # type: ignore [assignment]
633 db_log_output = Bool(
634 False, help="Should the history database include output? (default: no)"
635 ).tag(config=True)
636 db_cache_size = Integer(
637 0,
638 help="Write to database every x commands (higher values save disk access & power).\n"
639 "Values of 1 or less effectively disable caching.",
640 ).tag(config=True)
641 # The input and output caches
642 db_input_cache: List[tuple[int, str, str]] = List()
643 db_output_cache: List[tuple[int, str]] = List()
645 # History saving in separate thread
646 save_thread = Instance("IPython.core.history.HistorySavingThread", allow_none=True)
648 @property
649 def save_flag(self) -> threading.Event | None:
650 if self.save_thread is not None:
651 return self.save_thread.save_flag
652 return None
654 # Private interface
655 # Variables used to store the three last inputs from the user. On each new
656 # history update, we populate the user's namespace with these, shifted as
657 # necessary.
658 _i00 = Unicode("")
659 _i = Unicode("")
660 _ii = Unicode("")
661 _iii = Unicode("")
663 # A regex matching all forms of the exit command, so that we don't store
664 # them in the history (it's annoying to rewind the first entry and land on
665 # an exit call).
666 _exit_re = re.compile(r"(exit|quit)(\s*\(.*\))?$")
668 _instances: WeakSet[HistoryManager] = WeakSet()
669 _max_inst: int | float = float("inf")
671 def __init__(
672 self,
673 shell: InteractiveShell,
674 config: Optional[Configuration] = None,
675 **traits: typing.Any,
676 ):
677 """Create a new history manager associated with a shell instance."""
678 super().__init__(shell=shell, config=config, **traits)
679 self.db_input_cache_lock = threading.Lock()
680 self.db_output_cache_lock = threading.Lock()
682 try:
683 self.new_session()
684 except OperationalError:
685 self.log.error(
686 "Failed to create history session in %s. History will not be saved.",
687 self.hist_file,
688 exc_info=True,
689 )
690 self.hist_file = ":memory:"
692 if self.enabled and self.hist_file != ":memory:":
693 self.save_thread = HistorySavingThread(self)
694 try:
695 self.save_thread.start()
696 except RuntimeError:
697 self.log.error(
698 "Failed to start history saving thread. History will not be saved.",
699 exc_info=True,
700 )
701 self.hist_file = ":memory:"
702 self._instances.add(self)
703 assert len(HistoryManager._instances) <= HistoryManager._max_inst, (
704 len(HistoryManager._instances),
705 HistoryManager._max_inst,
706 )
708 def __del__(self) -> None:
709 if self.save_thread is not None:
710 self.save_thread.stop()
712 def _get_hist_file_name(self, profile: Optional[str] = None) -> Path:
713 """Get default history file name based on the Shell's profile.
715 The profile parameter is ignored, but must exist for compatibility with
716 the parent class."""
717 profile_dir = self.shell.profile_dir.location
718 return Path(profile_dir) / "history.sqlite"
720 @only_when_enabled
721 def new_session(self, conn: Optional[sqlite3.Connection] = None) -> None:
722 """Get a new session number."""
723 if conn is None:
724 conn = self.db
726 with conn:
727 cur = conn.execute(
728 """INSERT INTO sessions VALUES (NULL, ?, NULL,
729 NULL, '') """,
730 (datetime.datetime.now().isoformat(" "),),
731 )
732 assert isinstance(cur.lastrowid, int)
733 self.session_number = cur.lastrowid
735 def end_session(self) -> None:
736 """Close the database session, filling in the end time and line count."""
737 self.writeout_cache()
738 with self.db:
739 self.db.execute(
740 """UPDATE sessions SET end=?, num_cmds=? WHERE
741 session==?""",
742 (
743 datetime.datetime.now(datetime.timezone.utc).isoformat(" "),
744 len(self.input_hist_parsed) - 1,
745 self.session_number,
746 ),
747 )
748 self.session_number = 0
750 def name_session(self, name: str) -> None:
751 """Give the current session a name in the history database."""
752 warn(
753 "name_session is deprecated in IPython 9.0 and will be removed in future versions",
754 DeprecationWarning,
755 stacklevel=2,
756 )
757 with self.db:
758 self.db.execute(
759 "UPDATE sessions SET remark=? WHERE session==?",
760 (name, self.session_number),
761 )
763 def reset(self, new_session: bool = True) -> None:
764 """Clear the session history, releasing all object references, and
765 optionally open a new session."""
766 self.output_hist.clear()
767 self.outputs.clear()
768 self.exceptions.clear()
770 # The directory history can't be completely empty
771 self.dir_hist[:] = [Path.cwd()]
773 if new_session:
774 if self.session_number:
775 self.end_session()
776 self.input_hist_parsed[:] = [""]
777 self.input_hist_raw[:] = [""]
778 self.new_session()
780 # ------------------------------
781 # Methods for retrieving history
782 # ------------------------------
783 def get_session_info(
784 self, session: int = 0
785 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]:
786 """Get info about a session.
788 Parameters
789 ----------
790 session : int
791 Session number to retrieve. The current session is 0, and negative
792 numbers count back from current session, so -1 is the previous session.
794 Returns
795 -------
796 session_id : int
797 Session ID number
798 start : datetime
799 Timestamp for the start of the session.
800 end : datetime
801 Timestamp for the end of the session, or None if IPython crashed.
802 num_cmds : int
803 Number of commands run, or None if IPython crashed.
804 remark : str
805 A manually set description.
806 """
807 if session <= 0:
808 session += self.session_number
810 return super(HistoryManager, self).get_session_info(session=session)
812 @catch_corrupt_db
813 def get_tail(
814 self,
815 n: int = 10,
816 raw: bool = True,
817 output: bool = False,
818 include_latest: bool = False,
819 ) -> Iterable[tuple[int, int, InOrInOut]]:
820 """Get the last n lines from the history database.
822 Most recent entry last.
824 Completion will be reordered so that that the last ones are when
825 possible from current session.
827 Parameters
828 ----------
829 n : int
830 The number of lines to get
831 raw, output : bool
832 See :meth:`get_range`
833 include_latest : bool
834 If False (default), n+1 lines are fetched, and the latest one
835 is discarded. This is intended to be used where the function
836 is called by a user command, which it should not return.
838 Returns
839 -------
840 Tuples as :meth:`get_range`
841 """
842 self.writeout_cache()
843 if not include_latest:
844 n += 1
845 # cursor/line/entry
846 this_cur = list(
847 self._run_sql(
848 "WHERE session == ? ORDER BY line DESC LIMIT ? ",
849 (self.session_number, n),
850 raw=raw,
851 output=output,
852 )
853 )
854 other_cur = list(
855 self._run_sql(
856 "WHERE session != ? ORDER BY session DESC, line DESC LIMIT ?",
857 (self.session_number, n),
858 raw=raw,
859 output=output,
860 )
861 )
863 everything: list[tuple[int, int, InOrInOut]] = this_cur + other_cur
865 everything = everything[:n]
867 if not include_latest:
868 return list(everything)[:0:-1]
869 return list(everything)[::-1]
871 def _get_range_session(
872 self,
873 start: int = 1,
874 stop: Optional[int] = None,
875 raw: bool = True,
876 output: bool = False,
877 ) -> Iterable[tuple[int, int, InOrInOut]]:
878 """Get input and output history from the current session. Called by
879 get_range, and takes similar parameters."""
880 input_hist = self.input_hist_raw if raw else self.input_hist_parsed
882 n = len(input_hist)
883 if start < 0:
884 start += n
885 if not stop or (stop > n):
886 stop = n
887 elif stop < 0:
888 stop += n
889 line: InOrInOut
890 for i in range(start, stop):
891 if output:
892 line = (input_hist[i], self.output_hist_reprs.get(i))
893 else:
894 line = input_hist[i]
895 yield (0, i, line)
897 def get_range(
898 self,
899 session: int = 0,
900 start: int = 1,
901 stop: Optional[int] = None,
902 raw: bool = True,
903 output: bool = False,
904 ) -> Iterable[tuple[int, int, InOrInOut]]:
905 """Retrieve input by session.
907 Parameters
908 ----------
909 session : int
910 Session number to retrieve. The current session is 0, and negative
911 numbers count back from current session, so -1 is previous session.
912 start : int
913 First line to retrieve.
914 stop : int
915 End of line range (excluded from output itself). If None, retrieve
916 to the end of the session.
917 raw : bool
918 If True, return untranslated input
919 output : bool
920 If True, attempt to include output. This will be 'real' Python
921 objects for the current session, or text reprs from previous
922 sessions if db_log_output was enabled at the time. Where no output
923 is found, None is used.
925 Returns
926 -------
927 entries
928 An iterator over the desired lines. Each line is a 3-tuple, either
929 (session, line, input) if output is False, or
930 (session, line, (input, output)) if output is True.
931 """
932 if session <= 0:
933 session += self.session_number
934 if session == self.session_number: # Current session
935 return self._get_range_session(start, stop, raw, output)
936 return super(HistoryManager, self).get_range(session, start, stop, raw, output)
938 ## ----------------------------
939 ## Methods for storing history:
940 ## ----------------------------
941 def store_inputs(
942 self, line_num: int, source: str, source_raw: Optional[str] = None
943 ) -> None:
944 """Store source and raw input in history and create input cache
945 variables ``_i*``.
947 Parameters
948 ----------
949 line_num : int
950 The prompt number of this input.
951 source : str
952 Python input.
953 source_raw : str, optional
954 If given, this is the raw input without any IPython transformations
955 applied to it. If not given, ``source`` is used.
956 """
957 if source_raw is None:
958 source_raw = source
959 source = source.rstrip("\n")
960 source_raw = source_raw.rstrip("\n")
962 # do not store exit/quit commands
963 if self._exit_re.match(source_raw.strip()):
964 return
966 self.input_hist_parsed.append(source)
967 self.input_hist_raw.append(source_raw)
969 with self.db_input_cache_lock:
970 self.db_input_cache.append((line_num, source, source_raw))
971 # Trigger to flush cache and write to DB.
972 if len(self.db_input_cache) >= self.db_cache_size:
973 if self.save_flag:
974 self.save_flag.set()
976 # update the auto _i variables
977 self._iii = self._ii
978 self._ii = self._i
979 self._i = self._i00
980 self._i00 = source_raw
982 # hackish access to user namespace to create _i1,_i2... dynamically
983 new_i = "_i%s" % line_num
984 to_main = {"_i": self._i, "_ii": self._ii, "_iii": self._iii, new_i: self._i00}
986 if self.shell is not None:
987 self.shell.push(to_main, interactive=False)
989 def store_output(self, line_num: int) -> None:
990 """If database output logging is enabled, this saves all the
991 outputs from the indicated prompt number to the database. It's
992 called by run_cell after code has been executed.
994 Parameters
995 ----------
996 line_num : int
997 The line number from which to save outputs
998 """
999 if (not self.db_log_output) or (line_num not in self.output_hist_reprs):
1000 return
1001 lnum: int = line_num
1002 output = self.output_hist_reprs[line_num]
1004 with self.db_output_cache_lock:
1005 self.db_output_cache.append((line_num, output))
1006 if self.db_cache_size <= 1 and self.save_flag is not None:
1007 self.save_flag.set()
1009 def _writeout_input_cache(self, conn: sqlite3.Connection) -> None:
1010 with conn:
1011 for line in self.db_input_cache:
1012 conn.execute(
1013 "INSERT INTO history VALUES (?, ?, ?, ?)",
1014 (self.session_number,) + line,
1015 )
1017 def _writeout_output_cache(self, conn: sqlite3.Connection) -> None:
1018 with conn:
1019 for line in self.db_output_cache:
1020 conn.execute(
1021 "INSERT INTO output_history VALUES (?, ?, ?)",
1022 (self.session_number,) + line,
1023 )
1025 @only_when_enabled
1026 def writeout_cache(self, conn: Optional[sqlite3.Connection] = None) -> None:
1027 """Write any entries in the cache to the database."""
1028 if conn is None:
1029 conn = self.db
1031 with self.db_input_cache_lock:
1032 try:
1033 self._writeout_input_cache(conn)
1034 except sqlite3.IntegrityError:
1035 self.new_session(conn)
1036 print(
1037 "ERROR! Session/line number was not unique in",
1038 "database. History logging moved to new session",
1039 self.session_number,
1040 )
1041 try:
1042 # Try writing to the new session. If this fails, don't
1043 # recurse
1044 self._writeout_input_cache(conn)
1045 except sqlite3.IntegrityError:
1046 pass
1047 finally:
1048 self.db_input_cache = []
1050 with self.db_output_cache_lock:
1051 try:
1052 self._writeout_output_cache(conn)
1053 except sqlite3.IntegrityError:
1054 print(
1055 "!! Session/line number for output was not unique",
1056 "in database. Output will not be stored.",
1057 )
1058 finally:
1059 self.db_output_cache = []
1062from collections.abc import Callable, Iterator
1063from weakref import ReferenceType
1066@contextmanager
1067def hold(ref: ReferenceType[HistoryManager]) -> Iterator[ReferenceType[HistoryManager]]:
1068 """
1069 Context manger that hold a reference to a weak ref to make sure it
1070 is not GC'd during it's context.
1071 """
1072 r = ref()
1073 yield ref
1074 del r
1077class HistorySavingThread(threading.Thread):
1078 """This thread takes care of writing history to the database, so that
1079 the UI isn't held up while that happens.
1081 It waits for the HistoryManager's save_flag to be set, then writes out
1082 the history cache. The main thread is responsible for setting the flag when
1083 the cache size reaches a defined threshold."""
1085 save_flag: threading.Event
1086 daemon: bool = True
1087 _stop_now: bool = False
1088 enabled: bool = True
1089 history_manager: ref[HistoryManager]
1090 _stopped = False
1092 def __init__(self, history_manager: HistoryManager) -> None:
1093 super(HistorySavingThread, self).__init__(name="IPythonHistorySavingThread")
1094 self.history_manager = ref(history_manager)
1095 self.enabled = history_manager.enabled
1096 self.save_flag = threading.Event()
1098 @only_when_enabled
1099 def run(self) -> None:
1100 atexit.register(self.stop)
1101 # We need a separate db connection per thread:
1102 try:
1103 hm: ReferenceType[HistoryManager]
1104 with hold(self.history_manager) as hm:
1105 if hm() is not None:
1106 self.db = sqlite3.connect(
1107 str(hm().hist_file), # type: ignore [union-attr]
1108 **hm().connection_options, # type: ignore [union-attr]
1109 )
1110 while True:
1111 self.save_flag.wait()
1112 with hold(self.history_manager) as hm:
1113 if hm() is None:
1114 self._stop_now = True
1115 if self._stop_now:
1116 self.db.close()
1117 return
1118 self.save_flag.clear()
1119 if hm() is not None:
1120 hm().writeout_cache(self.db) # type: ignore [union-attr]
1122 except Exception as e:
1123 print(
1124 (
1125 "The history saving thread hit an unexpected error (%s)."
1126 "History will not be written to the database."
1127 )
1128 % repr(e)
1129 )
1130 finally:
1131 atexit.unregister(self.stop)
1133 def stop(self) -> None:
1134 """This can be called from the main thread to safely stop this thread.
1136 Note that it does not attempt to write out remaining history before
1137 exiting. That should be done by calling the HistoryManager's
1138 end_session method."""
1139 if self._stopped:
1140 return
1141 self._stop_now = True
1143 self.save_flag.set()
1144 self._stopped = True
1145 if self != threading.current_thread():
1146 self.join()
1148 def __del__(self) -> None:
1149 self.stop()
1152# To match, e.g. ~5/8-~2/3, or ~4 (without trailing slash for full session)
1153# Session numbers: ~N or N/
1154# Line numbers: N (just digits, no ~)
1155# Range syntax: 4-6 (with end) or 4- (without end, means "onward")
1156range_re = re.compile(
1157 r"""
1158((?P<startsess>(?:~?\d+/)))?
1159(?P<start>\d+)?
1160((?P<sep>[\-:])
1161 ((?P<endsess>(?:~?\d+/)))?
1162 (?P<end>\d*))?
1163$""",
1164 re.VERBOSE,
1165)
1168def extract_hist_ranges(ranges_str: str) -> Iterable[tuple[int, int, Optional[int]]]:
1169 """Turn a string of history ranges into 3-tuples of (session, start, stop).
1171 Empty string results in a `[(0, 1, None)]`, i.e. "everything from current
1172 session".
1174 Examples
1175 --------
1176 >>> list(extract_hist_ranges("~8/5-~7/4 2"))
1177 [(-8, 5, None), (-7, 1, 5), (0, 2, 3)]
1178 >>> list(extract_hist_ranges("~4/"))
1179 [(-4, 1, None)]
1180 >>> list(extract_hist_ranges("4-"))
1181 [(0, 4, None)]
1182 >>> list(extract_hist_ranges("~4/4-"))
1183 [(-4, 4, None)]
1184 """
1185 if ranges_str == "":
1186 yield (0, 1, None) # Everything from current session
1187 return
1189 for range_str in ranges_str.split():
1190 rmatch = range_re.match(range_str)
1191 if not rmatch:
1192 continue
1193 start = rmatch.group("start")
1194 sep = rmatch.group("sep")
1195 if start:
1196 start = int(start)
1197 end = rmatch.group("end")
1198 if sep == "-":
1199 end = (int(end) + 1) if end else None
1200 else:
1201 end = int(end) if end else start + 1
1202 else:
1203 if not rmatch.group("startsess"):
1204 continue
1205 start = 1
1206 end = None
1207 startsess = rmatch.group("startsess") or "0"
1208 endsess = rmatch.group("endsess") or startsess
1209 startsess = startsess.rstrip("/")
1210 endsess = endsess.rstrip("/")
1211 startsess = int(startsess.replace("~", "-"))
1212 endsess = int(endsess.replace("~", "-"))
1213 assert endsess >= startsess, "start session must be earlier than end session"
1215 if endsess == startsess:
1216 yield (startsess, start, end)
1217 continue
1218 # Multiple sessions in one range:
1219 yield (startsess, start, None)
1220 for sess in range(startsess + 1, endsess):
1221 yield (sess, 1, None)
1222 yield (endsess, 1, end)
1225def _format_lineno(session: int, line: int) -> str:
1226 """Helper function to format line numbers properly."""
1227 if session == 0:
1228 return str(line)
1229 return "%s#%s" % (session, line)