Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/IPython/core/history.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""History related magics and functionality"""
3from __future__ import annotations
5# Copyright (c) IPython Development Team.
6# Distributed under the terms of the Modified BSD License.
9import atexit
10import datetime
11import os
12import re
13import weakref
16import threading
17from pathlib import Path
19from collections import defaultdict
20from contextlib import contextmanager
21from dataclasses import dataclass
22from decorator import decorator
23from traitlets import (
24 Any,
25 Bool,
26 Dict,
27 Instance,
28 Integer,
29 List,
30 TraitError,
31 Unicode,
32 Union,
33 default,
34 observe,
35)
36from traitlets.config.configurable import LoggingConfigurable
38from IPython.paths import locate_profile
39from IPython.utils.decorators import undoc
40from typing import Tuple, Optional, TYPE_CHECKING
41from collections.abc import Iterable
42import typing
43import typing as t
44from typing import cast
45from warnings import warn
46from weakref import ref, WeakSet
48if TYPE_CHECKING:
49 from IPython.core.interactiveshell import InteractiveShell
50 from IPython.config.Configuration import Configuration
52try:
53 from sqlite3 import DatabaseError, OperationalError
54 import sqlite3
56 sqlite3.register_converter(
57 "timestamp", lambda val: datetime.datetime.fromisoformat(val.decode())
58 )
60 sqlite3_found = True
61except ModuleNotFoundError:
62 sqlite3_found = False
64 class DatabaseError(Exception): # type: ignore [no-redef]
65 pass
67 class OperationalError(Exception): # type: ignore [no-redef]
68 pass
71InOrInOut = typing.Union[str, tuple[str, Optional[str]]]
73# -----------------------------------------------------------------------------
74# Classes and functions
75# -----------------------------------------------------------------------------
78@undoc
79class DummyDB:
80 """Dummy DB that will act as a black hole for history.
82 Only used in the absence of sqlite"""
84 def execute(*args: typing.Any, **kwargs: typing.Any) -> list:
85 return []
87 def commit(self, *args, **kwargs): # type: ignore [no-untyped-def]
88 pass
90 def __enter__(self, *args, **kwargs): # type: ignore [no-untyped-def]
91 pass
93 def __exit__(self, *args, **kwargs): # type: ignore [no-untyped-def]
94 pass
96 def close(self, *args, **kwargs): # type: ignore [no-untyped-def]
97 pass
100@decorator
101def only_when_enabled(f, self, *a, **kw): # type: ignore [no-untyped-def]
102 """Decorator: return an empty list in the absence of sqlite."""
103 if not self.enabled:
104 return []
105 else:
106 return f(self, *a, **kw)
109# use 16kB as threshold for whether a corrupt history db should be saved
110# that should be at least 100 entries or so
111_SAVE_DB_SIZE = 16384
114@decorator
115def catch_corrupt_db(f, self, *a, **kw): # type: ignore [no-untyped-def]
116 """A decorator which wraps HistoryAccessor method calls to catch errors from
117 a corrupt SQLite database, move the old database out of the way, and create
118 a new one.
120 We avoid clobbering larger databases because this may be triggered due to filesystem issues,
121 not just a corrupt file.
122 """
123 try:
124 return f(self, *a, **kw)
125 except (DatabaseError, OperationalError) as e:
126 self._corrupt_db_counter += 1
127 self.log.error("Failed to open SQLite history %s (%s).", self.hist_file, e)
128 if self.hist_file != ":memory:":
129 if self._corrupt_db_counter > self._corrupt_db_limit:
130 self.hist_file = ":memory:"
131 self.log.error(
132 "Failed to load history too many times, history will not be saved."
133 )
134 elif self.hist_file.is_file():
135 # move the file out of the way
136 base = str(self.hist_file.parent / self.hist_file.stem)
137 ext = self.hist_file.suffix
138 size = self.hist_file.stat().st_size
139 if size >= _SAVE_DB_SIZE:
140 # if there's significant content, avoid clobbering
141 now = (
142 datetime.datetime.now(datetime.timezone.utc)
143 .isoformat()
144 .replace(":", ".")
145 )
146 newpath = base + "-corrupt-" + now + ext
147 # don't clobber previous corrupt backups
148 for i in range(100):
149 if not Path(newpath).exists():
150 break
151 else:
152 newpath = base + "-corrupt-" + now + ("-%i" % i) + ext
153 else:
154 # not much content, possibly empty; don't worry about clobbering
155 # maybe we should just delete it?
156 newpath = base + "-corrupt" + ext
157 self.hist_file.rename(newpath)
158 self.log.error(
159 "History file was moved to %s and a new file created.", newpath
160 )
161 self.init_db()
162 return []
163 else:
164 # Failed with :memory:, something serious is wrong
165 raise
168class HistoryAccessorBase(LoggingConfigurable):
169 """An abstract class for History Accessors"""
171 def get_tail(
172 self,
173 n: int = 10,
174 raw: bool = True,
175 output: bool = False,
176 include_latest: bool = False,
177 ) -> Iterable[tuple[int, int, InOrInOut]]:
178 raise NotImplementedError
180 def search(
181 self,
182 pattern: str = "*",
183 raw: bool = True,
184 search_raw: bool = True,
185 output: bool = False,
186 n: Optional[int] = None,
187 unique: bool = False,
188 ) -> Iterable[tuple[int, int, InOrInOut]]:
189 raise NotImplementedError
191 def get_range(
192 self,
193 session: int,
194 start: int = 1,
195 stop: Optional[int] = None,
196 raw: bool = True,
197 output: bool = False,
198 ) -> Iterable[tuple[int, int, InOrInOut]]:
199 raise NotImplementedError
201 def get_range_by_str(
202 self, rangestr: str, raw: bool = True, output: bool = False
203 ) -> Iterable[tuple[int, int, InOrInOut]]:
204 raise NotImplementedError
207class HistoryAccessor(HistoryAccessorBase):
208 """Access the history database without adding to it.
210 This is intended for use by standalone history tools. IPython shells use
211 HistoryManager, below, which is a subclass of this."""
213 # counter for init_db retries, so we don't keep trying over and over
214 _corrupt_db_counter = 0
215 # after two failures, fallback on :memory:
216 _corrupt_db_limit = 2
218 # String holding the path to the history file
219 hist_file = Union(
220 [Instance(Path), Unicode()],
221 help="""Path to file to use for SQLite history database.
223 By default, IPython will put the history database in the IPython
224 profile directory. If you would rather share one history among
225 profiles, you can set this value in each, so that they are consistent.
227 Due to an issue with fcntl, SQLite is known to misbehave on some NFS
228 mounts. If you see IPython hanging, try setting this to something on a
229 local disk, e.g::
231 ipython --HistoryManager.hist_file=/tmp/ipython_hist.sqlite
233 you can also use the specific value `:memory:` (including the colon
234 at both end but not the back ticks), to avoid creating an history file.
236 """,
237 ).tag(config=True)
239 enabled = Bool(
240 sqlite3_found,
241 help="""enable the SQLite history
243 set enabled=False to disable the SQLite history,
244 in which case there will be no stored history, no SQLite connection,
245 and no background saving thread. This may be necessary in some
246 threaded environments where IPython is embedded.
247 """,
248 ).tag(config=True)
250 connection_options = Dict(
251 help="""Options for configuring the SQLite connection
253 These options are passed as keyword args to sqlite3.connect
254 when establishing database connections.
255 """
256 ).tag(config=True)
258 @default("connection_options")
259 def _default_connection_options(self) -> dict[str, bool]:
260 return dict(check_same_thread=False)
262 # The SQLite database
263 db = Any()
265 @observe("db")
266 @only_when_enabled
267 def _db_changed(self, change): # type: ignore [no-untyped-def]
268 """validate the db, since it can be an Instance of two different types"""
269 new = change["new"]
270 connection_types = (DummyDB, sqlite3.Connection)
271 if not isinstance(new, connection_types):
272 msg = "%s.db must be sqlite3 Connection or DummyDB, not %r" % (
273 self.__class__.__name__,
274 new,
275 )
276 raise TraitError(msg)
278 def __init__(
279 self, profile: str = "default", hist_file: str = "", **traits: typing.Any
280 ) -> None:
281 """Create a new history accessor.
283 Parameters
284 ----------
285 profile : str
286 The name of the profile from which to open history.
287 hist_file : str
288 Path to an SQLite history database stored by IPython. If specified,
289 hist_file overrides profile.
290 config : :class:`~traitlets.config.loader.Config`
291 Config object. hist_file can also be set through this.
292 """
293 super(HistoryAccessor, self).__init__(**traits)
294 # defer setting hist_file from kwarg until after init,
295 # otherwise the default kwarg value would clobber any value
296 # set by config
297 if hist_file:
298 self.hist_file = hist_file
300 try:
301 self.hist_file
302 except TraitError:
303 # No one has set the hist_file, yet.
304 self.hist_file = self._get_hist_file_name(profile)
306 self.init_db()
308 def _get_hist_file_name(self, profile: str = "default") -> Path:
309 """Find the history file for the given profile name.
311 This is overridden by the HistoryManager subclass, to use the shell's
312 active profile.
314 Parameters
315 ----------
316 profile : str
317 The name of a profile which has a history file.
318 """
319 return Path(locate_profile(profile)) / "history.sqlite"
321 @catch_corrupt_db
322 def init_db(self) -> None:
323 """Connect to the database, and create tables if necessary."""
324 if not self.enabled:
325 self.db = DummyDB()
326 self._finalizer = weakref.finalize(self, lambda db: db.close(), self.db)
327 return
329 # use detect_types so that timestamps return datetime objects
330 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
331 kwargs.update(self.connection_options)
332 self.db = sqlite3.connect(str(self.hist_file), **kwargs) # type: ignore [call-overload]
333 self._finalizer = weakref.finalize(self, lambda db: db.close(), self.db)
334 with self.db:
335 self.db.execute(
336 """CREATE TABLE IF NOT EXISTS sessions (session integer
337 primary key autoincrement, start timestamp,
338 end timestamp, num_cmds integer, remark text)"""
339 )
340 self.db.execute(
341 """CREATE TABLE IF NOT EXISTS history
342 (session integer, line integer, source text, source_raw text,
343 PRIMARY KEY (session, line))"""
344 )
345 # Output history is optional, but ensure the table's there so it can be
346 # enabled later.
347 self.db.execute(
348 """CREATE TABLE IF NOT EXISTS output_history
349 (session integer, line integer, output text,
350 PRIMARY KEY (session, line))"""
351 )
352 # success! reset corrupt db count
353 self._corrupt_db_counter = 0
355 def writeout_cache(self) -> None:
356 """Overridden by HistoryManager to dump the cache before certain
357 database lookups."""
358 pass
360 ## -------------------------------
361 ## Methods for retrieving history:
362 ## -------------------------------
363 def _run_sql(
364 self,
365 sql: str,
366 params: tuple,
367 raw: bool = True,
368 output: bool = False,
369 latest: bool = False,
370 ) -> Iterable[tuple[int, int, InOrInOut]]:
371 """Prepares and runs an SQL query for the history database.
373 Parameters
374 ----------
375 sql : str
376 Any filtering expressions to go after SELECT ... FROM ...
377 params : tuple
378 Parameters passed to the SQL query (to replace "?")
379 raw, output : bool
380 See :meth:`get_range`
381 latest : bool
382 Select rows with max (session, line)
384 Returns
385 -------
386 Tuples as :meth:`get_range`
387 """
388 toget = "source_raw" if raw else "source"
389 sqlfrom = "history"
390 if output:
391 sqlfrom = "history LEFT JOIN output_history USING (session, line)"
392 toget = "history.%s, output_history.output" % toget
393 if latest:
394 toget += ", MAX(session * 128 * 1024 + line)"
395 this_querry = "SELECT session, line, %s FROM %s " % (toget, sqlfrom) + sql
396 cur = self.db.execute(this_querry, params)
397 if latest:
398 cur = (row[:-1] for row in cur)
399 if output: # Regroup into 3-tuples, and parse JSON
400 return ((ses, lin, (inp, out)) for ses, lin, inp, out in cur)
401 return cur
403 @only_when_enabled
404 @catch_corrupt_db
405 def get_session_info(
406 self, session: int
407 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]:
408 """Get info about a session.
410 Parameters
411 ----------
412 session : int
413 Session number to retrieve.
415 Returns
416 -------
417 session_id : int
418 Session ID number
419 start : datetime
420 Timestamp for the start of the session.
421 end : datetime
422 Timestamp for the end of the session, or None if IPython crashed.
423 num_cmds : int
424 Number of commands run, or None if IPython crashed.
425 remark : str
426 A manually set description.
427 """
428 query = "SELECT * from sessions where session == ?"
429 return self.db.execute(query, (session,)).fetchone()
431 @catch_corrupt_db
432 def get_last_session_id(self) -> Optional[int]:
433 """Get the last session ID currently in the database.
435 Within IPython, this should be the same as the value stored in
436 :attr:`HistoryManager.session_number`.
437 """
438 for record in self.get_tail(n=1, include_latest=True):
439 return record[0]
440 return None
442 @catch_corrupt_db
443 def get_tail(
444 self,
445 n: int = 10,
446 raw: bool = True,
447 output: bool = False,
448 include_latest: bool = False,
449 ) -> Iterable[tuple[int, int, InOrInOut]]:
450 """Get the last n lines from the history database.
452 Parameters
453 ----------
454 n : int
455 The number of lines to get
456 raw, output : bool
457 See :meth:`get_range`
458 include_latest : bool
459 If False (default), n+1 lines are fetched, and the latest one
460 is discarded. This is intended to be used where the function
461 is called by a user command, which it should not return.
463 Returns
464 -------
465 Tuples as :meth:`get_range`
466 """
467 self.writeout_cache()
468 if not include_latest:
469 n += 1
470 cur = self._run_sql(
471 "ORDER BY session DESC, line DESC LIMIT ?", (n,), raw=raw, output=output
472 )
473 if not include_latest:
474 return reversed(list(cur)[1:])
475 return reversed(list(cur))
477 @catch_corrupt_db
478 def search(
479 self,
480 pattern: str = "*",
481 raw: bool = True,
482 search_raw: bool = True,
483 output: bool = False,
484 n: Optional[int] = None,
485 unique: bool = False,
486 ) -> Iterable[tuple[int, int, InOrInOut]]:
487 """Search the database using unix glob-style matching (wildcards
488 * and ?).
490 Parameters
491 ----------
492 pattern : str
493 The wildcarded pattern to match when searching
494 search_raw : bool
495 If True, search the raw input, otherwise, the parsed input
496 raw, output : bool
497 See :meth:`get_range`
498 n : None or int
499 If an integer is given, it defines the limit of
500 returned entries.
501 unique : bool
502 When it is true, return only unique entries.
504 Returns
505 -------
506 Tuples as :meth:`get_range`
507 """
508 tosearch = "source_raw" if search_raw else "source"
509 if output:
510 tosearch = "history." + tosearch
511 self.writeout_cache()
512 sqlform = "WHERE %s GLOB ?" % tosearch
513 params: tuple[typing.Any, ...] = (pattern,)
514 if unique:
515 sqlform += " GROUP BY {0}".format(tosearch)
516 if n is not None:
517 sqlform += " ORDER BY session DESC, line DESC LIMIT ?"
518 params += (n,)
519 elif unique:
520 sqlform += " ORDER BY session, line"
521 cur = self._run_sql(sqlform, params, raw=raw, output=output, latest=unique)
522 if n is not None:
523 return reversed(list(cur))
524 return cur
526 @catch_corrupt_db
527 def get_range(
528 self,
529 session: int,
530 start: int = 1,
531 stop: Optional[int] = None,
532 raw: bool = True,
533 output: bool = False,
534 ) -> Iterable[tuple[int, int, InOrInOut]]:
535 """Retrieve input by session.
537 Parameters
538 ----------
539 session : int
540 Session number to retrieve.
541 start : int
542 First line to retrieve.
543 stop : int
544 End of line range (excluded from output itself). If None, retrieve
545 to the end of the session.
546 raw : bool
547 If True, return untranslated input
548 output : bool
549 If True, attempt to include output. This will be 'real' Python
550 objects for the current session, or text reprs from previous
551 sessions if db_log_output was enabled at the time. Where no output
552 is found, None is used.
554 Returns
555 -------
556 entries
557 An iterator over the desired lines. Each line is a 3-tuple, either
558 (session, line, input) if output is False, or
559 (session, line, (input, output)) if output is True.
560 """
561 params: tuple[typing.Any, ...]
562 if stop:
563 lineclause = "line >= ? AND line < ?"
564 params = (session, start, stop)
565 else:
566 lineclause = "line>=?"
567 params = (session, start)
569 return self._run_sql(
570 "WHERE session==? AND %s" % lineclause, params, raw=raw, output=output
571 )
573 def get_range_by_str(
574 self, rangestr: str, raw: bool = True, output: bool = False
575 ) -> Iterable[tuple[int, int, InOrInOut]]:
576 """Get lines of history from a string of ranges, as used by magic
577 commands %hist, %save, %macro, etc.
579 Parameters
580 ----------
581 rangestr : str
582 A string specifying ranges, e.g. "5 ~2/1-4". If empty string is used,
583 this will return everything from current session's history.
585 See the documentation of :func:`%history` for the full details.
587 raw, output : bool
588 As :meth:`get_range`
590 Returns
591 -------
592 Tuples as :meth:`get_range`
593 """
594 for sess, s, e in extract_hist_ranges(rangestr):
595 yield from self.get_range(sess, s, e, raw=raw, output=output)
598@dataclass
599class HistoryOutput:
600 output_type: typing.Literal[
601 "out_stream", "err_stream", "display_data", "execute_result"
602 ]
603 bundle: typing.Dict[str, str | list[str]]
606class HistoryManager(HistoryAccessor):
607 """A class to organize all history-related functionality in one place."""
609 # Public interface
611 # An instance of the IPython shell we are attached to
612 shell = Instance(
613 "IPython.core.interactiveshell.InteractiveShellABC", allow_none=False
614 )
615 # Lists to hold processed and raw history. These start with a blank entry
616 # so that we can index them starting from 1
617 input_hist_parsed = List([""])
618 input_hist_raw = List([""])
619 # A list of directories visited during session
620 dir_hist: List = List()
622 @default("dir_hist")
623 def _dir_hist_default(self) -> list[Path]:
624 try:
625 return [Path.cwd()]
626 except OSError:
627 return []
629 # A dict of output history, keyed with ints from the shell's
630 # execution count.
631 output_hist = Dict()
632 # The text/plain repr of outputs.
633 output_hist_reprs: typing.Dict[int, str] = Dict() # type: ignore [assignment]
634 # Maps execution_count to MIME bundles
635 outputs: typing.Dict[int, typing.List[HistoryOutput]] = defaultdict(list)
636 # Maps execution_count to exception tracebacks
637 exceptions: typing.Dict[int, typing.Dict[str, Any]] = Dict() # type: ignore [assignment]
639 # The number of the current session in the history database
640 session_number: int = Integer() # type: ignore [assignment]
642 db_log_output = Bool(
643 False, help="Should the history database include output? (default: no)"
644 ).tag(config=True)
645 db_cache_size = Integer(
646 0,
647 help="Write to database every x commands (higher values save disk access & power).\n"
648 "Values of 1 or less effectively disable caching.",
649 ).tag(config=True)
650 # The input and output caches
651 db_input_cache: List[tuple[int, str, str]] = List()
652 db_output_cache: List[tuple[int, str]] = List()
654 # History saving in separate thread
655 save_thread = Instance("IPython.core.history.HistorySavingThread", allow_none=True)
657 @property
658 def save_flag(self) -> threading.Event | None:
659 if self.save_thread is not None:
660 return self.save_thread.save_flag
661 return None
663 # Private interface
664 # Variables used to store the three last inputs from the user. On each new
665 # history update, we populate the user's namespace with these, shifted as
666 # necessary.
667 _i00 = Unicode("")
668 _i = Unicode("")
669 _ii = Unicode("")
670 _iii = Unicode("")
672 # A regex matching all forms of the exit command, so that we don't store
673 # them in the history (it's annoying to rewind the first entry and land on
674 # an exit call).
675 _exit_re = re.compile(r"(exit|quit)(\s*\(.*\))?$")
677 _instances: WeakSet[HistoryManager] = WeakSet()
678 _max_inst: int | float = float("inf")
680 def __init__(
681 self,
682 shell: InteractiveShell,
683 config: Optional[Configuration] = None,
684 **traits: typing.Any,
685 ):
686 """Create a new history manager associated with a shell instance."""
687 super().__init__(shell=shell, config=config, **traits)
688 self.db_input_cache_lock = threading.Lock()
689 self.db_output_cache_lock = threading.Lock()
691 try:
692 self.new_session()
693 except OperationalError:
694 self.log.error(
695 "Failed to create history session in %s. History will not be saved.",
696 self.hist_file,
697 exc_info=True,
698 )
699 self.hist_file = ":memory:"
701 self.using_thread = False
702 if self.enabled and self.hist_file != ":memory:":
703 self.save_thread = HistorySavingThread(self)
704 try:
705 self.save_thread.start()
706 except RuntimeError:
707 self.log.error(
708 "Failed to start history saving thread. History will not be saved.",
709 exc_info=True,
710 )
711 self.hist_file = ":memory:"
712 else:
713 self.using_thread = True
714 self._instances.add(self)
715 assert len(HistoryManager._instances) <= HistoryManager._max_inst, (
716 len(HistoryManager._instances),
717 HistoryManager._max_inst,
718 )
720 def __del__(self) -> None:
721 if self.save_thread is not None:
722 self.save_thread.stop()
724 @classmethod
725 def _stop_thread(cls) -> None:
726 # Used before forking so the thread isn't running at fork
727 for inst in cls._instances:
728 if inst.save_thread is not None:
729 inst.save_thread.stop()
730 inst.save_thread = None
732 def _restart_thread_if_stopped(self) -> None:
733 # Start the thread again after it was stopped for forking
734 if self.save_thread is None and self.using_thread:
735 self.save_thread = HistorySavingThread(self)
736 self.save_thread.start()
738 def _get_hist_file_name(self, profile: Optional[str] = None) -> Path:
739 """Get default history file name based on the Shell's profile.
741 The profile parameter is ignored, but must exist for compatibility with
742 the parent class."""
743 profile_dir = self.shell.profile_dir.location
744 return Path(profile_dir) / "history.sqlite"
746 @only_when_enabled
747 def new_session(self, conn: Optional[sqlite3.Connection] = None) -> None:
748 """Get a new session number."""
749 if conn is None:
750 conn = self.db
752 with conn:
753 cur = conn.execute(
754 """INSERT INTO sessions VALUES (NULL, ?, NULL,
755 NULL, '') """,
756 (datetime.datetime.now().isoformat(" "),),
757 )
758 assert isinstance(cur.lastrowid, int)
759 self.session_number = cur.lastrowid
761 def end_session(self) -> None:
762 """Close the database session, filling in the end time and line count."""
763 self.writeout_cache()
764 with self.db:
765 self.db.execute(
766 """UPDATE sessions SET end=?, num_cmds=? WHERE
767 session==?""",
768 (
769 datetime.datetime.now(datetime.timezone.utc).isoformat(" "),
770 len(self.input_hist_parsed) - 1,
771 self.session_number,
772 ),
773 )
774 self.session_number = 0
776 def name_session(self, name: str) -> None:
777 """Give the current session a name in the history database."""
778 warn(
779 "name_session is deprecated in IPython 9.0 and will be removed in future versions",
780 DeprecationWarning,
781 stacklevel=2,
782 )
783 with self.db:
784 self.db.execute(
785 "UPDATE sessions SET remark=? WHERE session==?",
786 (name, self.session_number),
787 )
789 def reset(self, new_session: bool = True) -> None:
790 """Clear the session history, releasing all object references, and
791 optionally open a new session."""
792 self.output_hist.clear()
793 self.outputs.clear()
794 self.exceptions.clear()
796 # The directory history can't be completely empty
797 self.dir_hist[:] = [Path.cwd()]
799 if new_session:
800 if self.session_number:
801 self.end_session()
802 self.input_hist_parsed[:] = [""]
803 self.input_hist_raw[:] = [""]
804 self.new_session()
806 # ------------------------------
807 # Methods for retrieving history
808 # ------------------------------
809 def get_session_info(
810 self, session: int = 0
811 ) -> tuple[int, datetime.datetime, Optional[datetime.datetime], Optional[int], str]:
812 """Get info about a session.
814 Parameters
815 ----------
816 session : int
817 Session number to retrieve. The current session is 0, and negative
818 numbers count back from current session, so -1 is the previous session.
820 Returns
821 -------
822 session_id : int
823 Session ID number
824 start : datetime
825 Timestamp for the start of the session.
826 end : datetime
827 Timestamp for the end of the session, or None if IPython crashed.
828 num_cmds : int
829 Number of commands run, or None if IPython crashed.
830 remark : str
831 A manually set description.
832 """
833 if session <= 0:
834 session += self.session_number
836 return super(HistoryManager, self).get_session_info(session=session)
838 @catch_corrupt_db
839 def get_tail(
840 self,
841 n: int = 10,
842 raw: bool = True,
843 output: bool = False,
844 include_latest: bool = False,
845 ) -> Iterable[tuple[int, int, InOrInOut]]:
846 """Get the last n lines from the history database.
848 Most recent entry last.
850 Completion will be reordered so that that the last ones are when
851 possible from current session.
853 Parameters
854 ----------
855 n : int
856 The number of lines to get
857 raw, output : bool
858 See :meth:`get_range`
859 include_latest : bool
860 If False (default), n+1 lines are fetched, and the latest one
861 is discarded. This is intended to be used where the function
862 is called by a user command, which it should not return.
864 Returns
865 -------
866 Tuples as :meth:`get_range`
867 """
868 self.writeout_cache()
869 if not include_latest:
870 n += 1
871 # cursor/line/entry
872 this_cur = list(
873 self._run_sql(
874 "WHERE session == ? ORDER BY line DESC LIMIT ? ",
875 (self.session_number, n),
876 raw=raw,
877 output=output,
878 )
879 )
880 other_cur = list(
881 self._run_sql(
882 "WHERE session != ? ORDER BY session DESC, line DESC LIMIT ?",
883 (self.session_number, n),
884 raw=raw,
885 output=output,
886 )
887 )
889 everything: list[tuple[int, int, InOrInOut]] = this_cur + other_cur
891 everything = everything[:n]
893 if not include_latest:
894 return list(everything)[:0:-1]
895 return list(everything)[::-1]
897 def _get_range_session(
898 self,
899 start: int = 1,
900 stop: Optional[int] = None,
901 raw: bool = True,
902 output: bool = False,
903 ) -> Iterable[tuple[int, int, InOrInOut]]:
904 """Get input and output history from the current session. Called by
905 get_range, and takes similar parameters."""
906 input_hist = self.input_hist_raw if raw else self.input_hist_parsed
908 n = len(input_hist)
909 if start < 0:
910 start += n
911 if not stop or (stop > n):
912 stop = n
913 elif stop < 0:
914 stop += n
915 line: InOrInOut
916 for i in range(start, stop):
917 if output:
918 line = (input_hist[i], self.output_hist_reprs.get(i))
919 else:
920 line = input_hist[i]
921 yield (0, i, line)
923 def get_range(
924 self,
925 session: int = 0,
926 start: int = 1,
927 stop: Optional[int] = None,
928 raw: bool = True,
929 output: bool = False,
930 ) -> Iterable[tuple[int, int, InOrInOut]]:
931 """Retrieve input by session.
933 Parameters
934 ----------
935 session : int
936 Session number to retrieve. The current session is 0, and negative
937 numbers count back from current session, so -1 is previous session.
938 start : int
939 First line to retrieve.
940 stop : int
941 End of line range (excluded from output itself). If None, retrieve
942 to the end of the session.
943 raw : bool
944 If True, return untranslated input
945 output : bool
946 If True, attempt to include output. This will be 'real' Python
947 objects for the current session, or text reprs from previous
948 sessions if db_log_output was enabled at the time. Where no output
949 is found, None is used.
951 Returns
952 -------
953 entries
954 An iterator over the desired lines. Each line is a 3-tuple, either
955 (session, line, input) if output is False, or
956 (session, line, (input, output)) if output is True.
957 """
958 if session <= 0:
959 session += self.session_number
960 if session == self.session_number: # Current session
961 return self._get_range_session(start, stop, raw, output)
962 return super(HistoryManager, self).get_range(session, start, stop, raw, output)
964 ## ----------------------------
965 ## Methods for storing history:
966 ## ----------------------------
967 def store_inputs(
968 self, line_num: int, source: str, source_raw: Optional[str] = None
969 ) -> None:
970 """Store source and raw input in history and create input cache
971 variables ``_i*``.
973 Parameters
974 ----------
975 line_num : int
976 The prompt number of this input.
977 source : str
978 Python input.
979 source_raw : str, optional
980 If given, this is the raw input without any IPython transformations
981 applied to it. If not given, ``source`` is used.
982 """
983 if source_raw is None:
984 source_raw = source
985 source = source.rstrip("\n")
986 source_raw = source_raw.rstrip("\n")
988 # do not store exit/quit commands
989 if self._exit_re.match(source_raw.strip()):
990 return
992 self.input_hist_parsed.append(source)
993 self.input_hist_raw.append(source_raw)
995 with self.db_input_cache_lock:
996 self.db_input_cache.append((line_num, source, source_raw))
997 # Trigger to flush cache and write to DB.
998 if len(self.db_input_cache) >= self.db_cache_size:
999 if self.using_thread:
1000 self._restart_thread_if_stopped()
1001 if self.save_flag is not None:
1002 self.save_flag.set()
1004 # update the auto _i variables
1005 self._iii = self._ii
1006 self._ii = self._i
1007 self._i = self._i00
1008 self._i00 = source_raw
1010 # hackish access to user namespace to create _i1,_i2... dynamically
1011 new_i = "_i%s" % line_num
1012 to_main = {"_i": self._i, "_ii": self._ii, "_iii": self._iii, new_i: self._i00}
1014 if self.shell is not None:
1015 self.shell.push(to_main, interactive=False)
1017 def store_output(self, line_num: int) -> None:
1018 """If database output logging is enabled, this saves all the
1019 outputs from the indicated prompt number to the database. It's
1020 called by run_cell after code has been executed.
1022 Parameters
1023 ----------
1024 line_num : int
1025 The line number from which to save outputs
1026 """
1027 if (not self.db_log_output) or (line_num not in self.output_hist_reprs):
1028 return
1029 lnum: int = line_num
1030 output = self.output_hist_reprs[line_num]
1032 with self.db_output_cache_lock:
1033 self.db_output_cache.append((line_num, output))
1034 if self.db_cache_size <= 1 and self.using_thread:
1035 self._restart_thread_if_stopped()
1036 if self.save_flag is not None:
1037 self.save_flag.set()
1039 def _writeout_input_cache(self, conn: sqlite3.Connection) -> None:
1040 with conn:
1041 for line in self.db_input_cache:
1042 conn.execute(
1043 "INSERT INTO history VALUES (?, ?, ?, ?)",
1044 (self.session_number,) + line,
1045 )
1047 def _writeout_output_cache(self, conn: sqlite3.Connection) -> None:
1048 with conn:
1049 for line in self.db_output_cache:
1050 conn.execute(
1051 "INSERT INTO output_history VALUES (?, ?, ?)",
1052 (self.session_number,) + line,
1053 )
1055 @only_when_enabled
1056 def writeout_cache(self, conn: Optional[sqlite3.Connection] = None) -> None:
1057 """Write any entries in the cache to the database."""
1058 if conn is None:
1059 conn = self.db
1061 with self.db_input_cache_lock:
1062 try:
1063 self._writeout_input_cache(conn)
1064 except sqlite3.IntegrityError:
1065 self.new_session(conn)
1066 print(
1067 "ERROR! Session/line number was not unique in",
1068 "database. History logging moved to new session",
1069 self.session_number,
1070 )
1071 try:
1072 # Try writing to the new session. If this fails, don't
1073 # recurse
1074 self._writeout_input_cache(conn)
1075 except sqlite3.IntegrityError:
1076 pass
1077 finally:
1078 self.db_input_cache = []
1080 with self.db_output_cache_lock:
1081 try:
1082 self._writeout_output_cache(conn)
1083 except sqlite3.IntegrityError:
1084 print(
1085 "!! Session/line number for output was not unique",
1086 "in database. Output will not be stored.",
1087 )
1088 finally:
1089 self.db_output_cache = []
1092if hasattr(os, "register_at_fork"):
1093 os.register_at_fork(before=HistoryManager._stop_thread)
1096from collections.abc import Callable, Iterator
1097from weakref import ReferenceType
1100@contextmanager
1101def hold(ref: ReferenceType[HistoryManager]) -> Iterator[ReferenceType[HistoryManager]]:
1102 """
1103 Context manger that hold a reference to a weak ref to make sure it
1104 is not GC'd during it's context.
1105 """
1106 r = ref()
1107 yield ref
1108 del r
1111class HistorySavingThread(threading.Thread):
1112 """This thread takes care of writing history to the database, so that
1113 the UI isn't held up while that happens.
1115 It waits for the HistoryManager's save_flag to be set, then writes out
1116 the history cache. The main thread is responsible for setting the flag when
1117 the cache size reaches a defined threshold."""
1119 save_flag: threading.Event
1120 daemon: bool = True
1121 _stop_now: bool = False
1122 enabled: bool = True
1123 history_manager: ref[HistoryManager]
1124 _stopped = False
1126 def __init__(self, history_manager: HistoryManager) -> None:
1127 super(HistorySavingThread, self).__init__(name="IPythonHistorySavingThread")
1128 self.history_manager = ref(history_manager)
1129 self.enabled = history_manager.enabled
1130 self.save_flag = threading.Event()
1132 @only_when_enabled
1133 def run(self) -> None:
1134 atexit.register(self.stop)
1135 # We need a separate db connection per thread:
1136 try:
1137 hm: ReferenceType[HistoryManager]
1138 with hold(self.history_manager) as hm:
1139 if hm() is not None:
1140 self.db = sqlite3.connect(
1141 str(hm().hist_file), # type: ignore [union-attr]
1142 **cast(dict[str, t.Any], hm().connection_options), # type: ignore [union-attr]
1143 )
1144 while True:
1145 self.save_flag.wait()
1146 with hold(self.history_manager) as hm:
1147 if hm() is None:
1148 self._stop_now = True
1149 if self._stop_now:
1150 self.db.close()
1151 return
1152 self.save_flag.clear()
1153 if hm() is not None:
1154 hm().writeout_cache(self.db) # type: ignore [union-attr]
1156 except Exception as e:
1157 print(
1158 (
1159 "The history saving thread hit an unexpected error (%s)."
1160 "History will not be written to the database."
1161 )
1162 % repr(e)
1163 )
1164 finally:
1165 atexit.unregister(self.stop)
1167 def stop(self) -> None:
1168 """This can be called from the main thread to safely stop this thread.
1170 Note that it does not attempt to write out remaining history before
1171 exiting. That should be done by calling the HistoryManager's
1172 end_session method."""
1173 if self._stopped:
1174 return
1175 self._stop_now = True
1177 self.save_flag.set()
1178 self._stopped = True
1179 if self != threading.current_thread():
1180 self.join()
1182 def __del__(self) -> None:
1183 self.stop()
1186# To match, e.g. ~5/8-~2/3, or ~4 (without trailing slash for full session)
1187# Session numbers: ~N or N/
1188# Line numbers: N (just digits, no ~)
1189# Range syntax: 4-6 (with end) or 4- (without end, means "onward")
1190range_re = re.compile(
1191 r"""
1192((?P<startsess>(?:~?\d+/)))?
1193(?P<start>\d+)?
1194((?P<sep>[\-:])
1195 ((?P<endsess>(?:~?\d+/)))?
1196 (?P<end>\d*))?
1197$""",
1198 re.VERBOSE,
1199)
1202def extract_hist_ranges(ranges_str: str) -> Iterable[tuple[int, int, Optional[int]]]:
1203 """Turn a string of history ranges into 3-tuples of (session, start, stop).
1205 Empty string results in a `[(0, 1, None)]`, i.e. "everything from current
1206 session".
1208 Examples
1209 --------
1210 >>> list(extract_hist_ranges("~8/5-~7/4 2"))
1211 [(-8, 5, None), (-7, 1, 5), (0, 2, 3)]
1212 >>> list(extract_hist_ranges("~4/"))
1213 [(-4, 1, None)]
1214 >>> list(extract_hist_ranges("4-"))
1215 [(0, 4, None)]
1216 >>> list(extract_hist_ranges("~4/4-"))
1217 [(-4, 4, None)]
1218 """
1219 if ranges_str == "":
1220 yield (0, 1, None) # Everything from current session
1221 return
1223 for range_str in ranges_str.split():
1224 rmatch = range_re.match(range_str)
1225 if not rmatch:
1226 continue
1227 start = rmatch.group("start")
1228 sep = rmatch.group("sep")
1229 if start:
1230 start = int(start)
1231 end = rmatch.group("end")
1232 if sep == "-":
1233 end = (int(end) + 1) if end else None
1234 else:
1235 end = int(end) if end else start + 1
1236 else:
1237 if not rmatch.group("startsess"):
1238 continue
1239 start = 1
1240 end = None
1241 startsess = rmatch.group("startsess") or "0"
1242 endsess = rmatch.group("endsess") or startsess
1243 startsess = startsess.rstrip("/")
1244 endsess = endsess.rstrip("/")
1245 startsess = int(startsess.replace("~", "-"))
1246 endsess = int(endsess.replace("~", "-"))
1247 assert endsess >= startsess, "start session must be earlier than end session"
1249 if endsess == startsess:
1250 yield (startsess, start, end)
1251 continue
1252 # Multiple sessions in one range:
1253 yield (startsess, start, None)
1254 for sess in range(startsess + 1, endsess):
1255 yield (sess, 1, None)
1256 yield (endsess, 1, end)
1259def _format_lineno(session: int, line: int) -> str:
1260 """Helper function to format line numbers properly."""
1261 if session == 0:
1262 return str(line)
1263 return "%s#%s" % (session, line)