1from __future__ import annotations
2
3import sys
4import typing
5
6if sys.version_info >= (3, 10): # pragma: no cover
7 from typing import ParamSpec
8else: # pragma: no cover
9 from typing_extensions import ParamSpec
10
11from starlette._utils import is_async_callable
12from starlette.concurrency import run_in_threadpool
13
14P = ParamSpec("P")
15
16
17class BackgroundTask:
18 def __init__(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
19 self.func = func
20 self.args = args
21 self.kwargs = kwargs
22 self.is_async = is_async_callable(func)
23
24 async def __call__(self) -> None:
25 if self.is_async:
26 await self.func(*self.args, **self.kwargs)
27 else:
28 await run_in_threadpool(self.func, *self.args, **self.kwargs)
29
30
31class BackgroundTasks(BackgroundTask):
32 def __init__(self, tasks: typing.Sequence[BackgroundTask] | None = None):
33 self.tasks = list(tasks) if tasks else []
34
35 def add_task(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
36 task = BackgroundTask(func, *args, **kwargs)
37 self.tasks.append(task)
38
39 async def __call__(self) -> None:
40 for task in self.tasks:
41 await task()