1""" 
    2Implementations for the history of a `Buffer`. 
    3 
    4NOTE: There is no `DynamicHistory`: 
    5      This doesn't work well, because the `Buffer` needs to be able to attach 
    6      an event handler to the event when a history entry is loaded. This 
    7      loading can be done asynchronously and making the history swappable would 
    8      probably break this. 
    9""" 
    10 
    11from __future__ import annotations 
    12 
    13import datetime 
    14import os 
    15import threading 
    16from abc import ABCMeta, abstractmethod 
    17from asyncio import get_running_loop 
    18from typing import AsyncGenerator, Iterable, Sequence, Union 
    19 
    20__all__ = [ 
    21    "History", 
    22    "ThreadedHistory", 
    23    "DummyHistory", 
    24    "FileHistory", 
    25    "InMemoryHistory", 
    26] 
    27 
    28 
    29class History(metaclass=ABCMeta): 
    30    """ 
    31    Base ``History`` class. 
    32 
    33    This also includes abstract methods for loading/storing history. 
    34    """ 
    35 
    36    def __init__(self) -> None: 
    37        # In memory storage for strings. 
    38        self._loaded = False 
    39 
    40        # History that's loaded already, in reverse order. Latest, most recent 
    41        # item first. 
    42        self._loaded_strings: list[str] = [] 
    43 
    44    # 
    45    # Methods expected by `Buffer`. 
    46    # 
    47 
    48    async def load(self) -> AsyncGenerator[str, None]: 
    49        """ 
    50        Load the history and yield all the entries in reverse order (latest, 
    51        most recent history entry first). 
    52 
    53        This method can be called multiple times from the `Buffer` to 
    54        repopulate the history when prompting for a new input. So we are 
    55        responsible here for both caching, and making sure that strings that 
    56        were were appended to the history will be incorporated next time this 
    57        method is called. 
    58        """ 
    59        if not self._loaded: 
    60            self._loaded_strings = list(self.load_history_strings()) 
    61            self._loaded = True 
    62 
    63        for item in self._loaded_strings: 
    64            yield item 
    65 
    66    def get_strings(self) -> list[str]: 
    67        """ 
    68        Get the strings from the history that are loaded so far. 
    69        (In order. Oldest item first.) 
    70        """ 
    71        return self._loaded_strings[::-1] 
    72 
    73    def append_string(self, string: str) -> None: 
    74        "Add string to the history." 
    75        self._loaded_strings.insert(0, string) 
    76        self.store_string(string) 
    77 
    78    # 
    79    # Implementation for specific backends. 
    80    # 
    81 
    82    @abstractmethod 
    83    def load_history_strings(self) -> Iterable[str]: 
    84        """ 
    85        This should be a generator that yields `str` instances. 
    86 
    87        It should yield the most recent items first, because they are the most 
    88        important. (The history can already be used, even when it's only 
    89        partially loaded.) 
    90        """ 
    91        while False: 
    92            yield 
    93 
    94    @abstractmethod 
    95    def store_string(self, string: str) -> None: 
    96        """ 
    97        Store the string in persistent storage. 
    98        """ 
    99 
    100 
    101class ThreadedHistory(History): 
    102    """ 
    103    Wrapper around `History` implementations that run the `load()` generator in 
    104    a thread. 
    105 
    106    Use this to increase the start-up time of prompt_toolkit applications. 
    107    History entries are available as soon as they are loaded. We don't have to 
    108    wait for everything to be loaded. 
    109    """ 
    110 
    111    def __init__(self, history: History) -> None: 
    112        super().__init__() 
    113 
    114        self.history = history 
    115 
    116        self._load_thread: threading.Thread | None = None 
    117 
    118        # Lock for accessing/manipulating `_loaded_strings` and `_loaded` 
    119        # together in a consistent state. 
    120        self._lock = threading.Lock() 
    121 
    122        # Events created by each `load()` call. Used to wait for new history 
    123        # entries from the loader thread. 
    124        self._string_load_events: list[threading.Event] = [] 
    125 
    126    async def load(self) -> AsyncGenerator[str, None]: 
    127        """ 
    128        Like `History.load(), but call `self.load_history_strings()` in a 
    129        background thread. 
    130        """ 
    131        # Start the load thread, if this is called for the first time. 
    132        if not self._load_thread: 
    133            self._load_thread = threading.Thread( 
    134                target=self._in_load_thread, 
    135                daemon=True, 
    136            ) 
    137            self._load_thread.start() 
    138 
    139        # Consume the `_loaded_strings` list, using asyncio. 
    140        loop = get_running_loop() 
    141 
    142        # Create threading Event so that we can wait for new items. 
    143        event = threading.Event() 
    144        event.set() 
    145        self._string_load_events.append(event) 
    146 
    147        items_yielded = 0 
    148 
    149        try: 
    150            while True: 
    151                # Wait for new items to be available. 
    152                # (Use a timeout, because the executor thread is not a daemon 
    153                # thread. The "slow-history.py" example would otherwise hang if 
    154                # Control-C is pressed before the history is fully loaded, 
    155                # because there's still this non-daemon executor thread waiting 
    156                # for this event.) 
    157                got_timeout = await loop.run_in_executor( 
    158                    None, lambda: event.wait(timeout=0.5) 
    159                ) 
    160                if not got_timeout: 
    161                    continue 
    162 
    163                # Read new items (in lock). 
    164                def in_executor() -> tuple[list[str], bool]: 
    165                    with self._lock: 
    166                        new_items = self._loaded_strings[items_yielded:] 
    167                        done = self._loaded 
    168                        event.clear() 
    169                    return new_items, done 
    170 
    171                new_items, done = await loop.run_in_executor(None, in_executor) 
    172 
    173                items_yielded += len(new_items) 
    174 
    175                for item in new_items: 
    176                    yield item 
    177 
    178                if done: 
    179                    break 
    180        finally: 
    181            self._string_load_events.remove(event) 
    182 
    183    def _in_load_thread(self) -> None: 
    184        try: 
    185            # Start with an empty list. In case `append_string()` was called 
    186            # before `load()` happened. Then `.store_string()` will have 
    187            # written these entries back to disk and we will reload it. 
    188            self._loaded_strings = [] 
    189 
    190            for item in self.history.load_history_strings(): 
    191                with self._lock: 
    192                    self._loaded_strings.append(item) 
    193 
    194                for event in self._string_load_events: 
    195                    event.set() 
    196        finally: 
    197            with self._lock: 
    198                self._loaded = True 
    199            for event in self._string_load_events: 
    200                event.set() 
    201 
    202    def append_string(self, string: str) -> None: 
    203        with self._lock: 
    204            self._loaded_strings.insert(0, string) 
    205        self.store_string(string) 
    206 
    207    # All of the following are proxied to `self.history`. 
    208 
    209    def load_history_strings(self) -> Iterable[str]: 
    210        return self.history.load_history_strings() 
    211 
    212    def store_string(self, string: str) -> None: 
    213        self.history.store_string(string) 
    214 
    215    def __repr__(self) -> str: 
    216        return f"ThreadedHistory({self.history!r})" 
    217 
    218 
    219class InMemoryHistory(History): 
    220    """ 
    221    :class:`.History` class that keeps a list of all strings in memory. 
    222 
    223    In order to prepopulate the history, it's possible to call either 
    224    `append_string` for all items or pass a list of strings to `__init__` here. 
    225    """ 
    226 
    227    def __init__(self, history_strings: Sequence[str] | None = None) -> None: 
    228        super().__init__() 
    229        # Emulating disk storage. 
    230        if history_strings is None: 
    231            self._storage = [] 
    232        else: 
    233            self._storage = list(history_strings) 
    234 
    235    def load_history_strings(self) -> Iterable[str]: 
    236        yield from self._storage[::-1] 
    237 
    238    def store_string(self, string: str) -> None: 
    239        self._storage.append(string) 
    240 
    241 
    242class DummyHistory(History): 
    243    """ 
    244    :class:`.History` object that doesn't remember anything. 
    245    """ 
    246 
    247    def load_history_strings(self) -> Iterable[str]: 
    248        return [] 
    249 
    250    def store_string(self, string: str) -> None: 
    251        pass 
    252 
    253    def append_string(self, string: str) -> None: 
    254        # Don't remember this. 
    255        pass 
    256 
    257 
    258_StrOrBytesPath = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"] 
    259 
    260 
    261class FileHistory(History): 
    262    """ 
    263    :class:`.History` class that stores all strings in a file. 
    264    """ 
    265 
    266    def __init__(self, filename: _StrOrBytesPath) -> None: 
    267        self.filename = filename 
    268        super().__init__() 
    269 
    270    def load_history_strings(self) -> Iterable[str]: 
    271        strings: list[str] = [] 
    272        lines: list[str] = [] 
    273 
    274        def add() -> None: 
    275            if lines: 
    276                # Join and drop trailing newline. 
    277                string = "".join(lines)[:-1] 
    278 
    279                strings.append(string) 
    280 
    281        if os.path.exists(self.filename): 
    282            with open(self.filename, "rb") as f: 
    283                for line_bytes in f: 
    284                    line = line_bytes.decode("utf-8", errors="replace") 
    285 
    286                    if line.startswith("+"): 
    287                        lines.append(line[1:]) 
    288                    else: 
    289                        add() 
    290                        lines = [] 
    291 
    292                add() 
    293 
    294        # Reverse the order, because newest items have to go first. 
    295        return reversed(strings) 
    296 
    297    def store_string(self, string: str) -> None: 
    298        # Save to file. 
    299        with open(self.filename, "ab") as f: 
    300 
    301            def write(t: str) -> None: 
    302                f.write(t.encode("utf-8")) 
    303 
    304            write(f"\n# {datetime.datetime.now()}\n") 
    305            for line in string.split("\n"): 
    306                write(f"+{line}\n")