1"""Tools for managing kernel specs"""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
6
7import json
8import os
9import re
10import shutil
11import typing as t
12import warnings
13
14from jupyter_core.paths import SYSTEM_JUPYTER_PATH, jupyter_data_dir, jupyter_path
15from traitlets import Bool, CaselessStrEnum, Dict, HasTraits, List, Set, Type, Unicode, observe
16from traitlets.config import LoggingConfigurable
17
18from .provisioning import KernelProvisionerFactory as KPF # noqa
19
20pjoin = os.path.join
21
22NATIVE_KERNEL_NAME = "python3"
23
24
25class KernelSpec(HasTraits):
26 """A kernel spec model object."""
27
28 argv: List[str] = List()
29 name = Unicode()
30 mimetype = Unicode()
31 display_name = Unicode()
32 language = Unicode()
33 env = Dict()
34 resource_dir = Unicode()
35 interrupt_mode = CaselessStrEnum(["message", "signal"], default_value="signal")
36 metadata = Dict()
37
38 @classmethod
39 def from_resource_dir(cls: type[KernelSpec], resource_dir: str) -> KernelSpec:
40 """Create a KernelSpec object by reading kernel.json
41
42 Pass the path to the *directory* containing kernel.json.
43 """
44 kernel_file = pjoin(resource_dir, "kernel.json")
45 with open(kernel_file, encoding="utf-8") as f:
46 kernel_dict = json.load(f)
47 return cls(resource_dir=resource_dir, **kernel_dict)
48
49 def to_dict(self) -> dict[str, t.Any]:
50 """Convert the kernel spec to a dict."""
51 d = {
52 "argv": self.argv,
53 "env": self.env,
54 "display_name": self.display_name,
55 "language": self.language,
56 "interrupt_mode": self.interrupt_mode,
57 "metadata": self.metadata,
58 }
59
60 return d
61
62 def to_json(self) -> str:
63 """Serialise this kernelspec to a JSON object.
64
65 Returns a string.
66 """
67 return json.dumps(self.to_dict())
68
69
70_kernel_name_pat = re.compile(r"^[a-z0-9._\-]+$", re.IGNORECASE)
71
72
73def _is_valid_kernel_name(name: str) -> t.Any:
74 """Check that a kernel name is valid."""
75 # quote is not unicode-safe on Python 2
76 return _kernel_name_pat.match(name)
77
78
79_kernel_name_description = (
80 "Kernel names can only contain ASCII letters and numbers and these separators:"
81 " - . _ (hyphen, period, and underscore)."
82)
83
84
85def _is_kernel_dir(path: str) -> bool:
86 """Is ``path`` a kernel directory?"""
87 return os.path.isdir(path) and os.path.isfile(pjoin(path, "kernel.json"))
88
89
90def _list_kernels_in(dir: str | None) -> dict[str, str]:
91 """Return a mapping of kernel names to resource directories from dir.
92
93 If dir is None or does not exist, returns an empty dict.
94 """
95 if dir is None or not os.path.isdir(dir):
96 return {}
97 kernels = {}
98 for f in os.listdir(dir):
99 path = pjoin(dir, f)
100 if not _is_kernel_dir(path):
101 continue
102 key = f.lower()
103 if not _is_valid_kernel_name(key):
104 warnings.warn(
105 f"Invalid kernelspec directory name ({_kernel_name_description}): {path}",
106 stacklevel=3,
107 )
108 kernels[key] = path
109 return kernels
110
111
112class NoSuchKernel(KeyError): # noqa
113 """An error raised when there is no kernel of a give name."""
114
115 def __init__(self, name: str) -> None:
116 """Initialize the error."""
117 self.name = name
118
119 def __str__(self) -> str:
120 return f"No such kernel named {self.name}"
121
122
123class KernelSpecManager(LoggingConfigurable):
124 """A manager for kernel specs."""
125
126 kernel_spec_class = Type(
127 KernelSpec,
128 config=True,
129 help="""The kernel spec class. This is configurable to allow
130 subclassing of the KernelSpecManager for customized behavior.
131 """,
132 )
133
134 ensure_native_kernel = Bool(
135 True,
136 config=True,
137 help="""If there is no Python kernelspec registered and the IPython
138 kernel is available, ensure it is added to the spec list.
139 """,
140 )
141
142 data_dir = Unicode()
143
144 def _data_dir_default(self) -> str:
145 return jupyter_data_dir()
146
147 user_kernel_dir = Unicode()
148
149 def _user_kernel_dir_default(self) -> str:
150 return pjoin(self.data_dir, "kernels")
151
152 whitelist = Set(
153 config=True,
154 help="""Deprecated, use `KernelSpecManager.allowed_kernelspecs`
155 """,
156 )
157 allowed_kernelspecs = Set(
158 config=True,
159 help="""List of allowed kernel names.
160
161 By default, all installed kernels are allowed.
162 """,
163 )
164 kernel_dirs: List[str] = List(
165 help="List of kernel directories to search. Later ones take priority over earlier."
166 )
167
168 _deprecated_aliases = {
169 "whitelist": ("allowed_kernelspecs", "7.0"),
170 }
171
172 # Method copied from
173 # https://github.com/jupyterhub/jupyterhub/blob/d1a85e53dccfc7b1dd81b0c1985d158cc6b61820/jupyterhub/auth.py#L143-L161
174 @observe(*list(_deprecated_aliases))
175 def _deprecated_trait(self, change: t.Any) -> None:
176 """observer for deprecated traits"""
177 old_attr = change.name
178 new_attr, version = self._deprecated_aliases[old_attr]
179 new_value = getattr(self, new_attr)
180 if new_value != change.new:
181 # only warn if different
182 # protects backward-compatible config from warnings
183 # if they set the same value under both names
184 self.log.warning(
185 f"{self.__class__.__name__}.{old_attr} is deprecated in jupyter_client "
186 f"{version}, use {self.__class__.__name__}.{new_attr} instead"
187 )
188 setattr(self, new_attr, change.new)
189
190 def _kernel_dirs_default(self) -> list[str]:
191 dirs = jupyter_path("kernels")
192 # At some point, we should stop adding .ipython/kernels to the path,
193 # but the cost to keeping it is very small.
194 try:
195 # this should always be valid on IPython 3+
196 from IPython.paths import get_ipython_dir
197
198 dirs.append(os.path.join(get_ipython_dir(), "kernels"))
199 except ModuleNotFoundError:
200 pass
201 return dirs
202
203 def find_kernel_specs(self) -> dict[str, str]:
204 """Returns a dict mapping kernel names to resource directories."""
205 d = {}
206 for kernel_dir in self.kernel_dirs:
207 kernels = _list_kernels_in(kernel_dir)
208 for kname, spec in kernels.items():
209 if kname not in d:
210 self.log.debug("Found kernel %s in %s", kname, kernel_dir)
211 d[kname] = spec
212
213 if self.ensure_native_kernel and NATIVE_KERNEL_NAME not in d:
214 try:
215 from ipykernel.kernelspec import RESOURCES
216
217 self.log.debug(
218 "Native kernel (%s) available from %s",
219 NATIVE_KERNEL_NAME,
220 RESOURCES,
221 )
222 d[NATIVE_KERNEL_NAME] = RESOURCES
223 except ImportError:
224 self.log.warning("Native kernel (%s) is not available", NATIVE_KERNEL_NAME)
225
226 if self.allowed_kernelspecs:
227 # filter if there's an allow list
228 d = {name: spec for name, spec in d.items() if name in self.allowed_kernelspecs}
229 return d
230 # TODO: Caching?
231
232 def _get_kernel_spec_by_name(self, kernel_name: str, resource_dir: str) -> KernelSpec:
233 """Returns a :class:`KernelSpec` instance for a given kernel_name
234 and resource_dir.
235 """
236 kspec = None
237 if kernel_name == NATIVE_KERNEL_NAME:
238 try:
239 from ipykernel.kernelspec import RESOURCES, get_kernel_dict
240 except ImportError:
241 # It should be impossible to reach this, but let's play it safe
242 pass
243 else:
244 if resource_dir == RESOURCES:
245 kdict = get_kernel_dict()
246 kspec = self.kernel_spec_class(resource_dir=resource_dir, **kdict)
247 if not kspec:
248 kspec = self.kernel_spec_class.from_resource_dir(resource_dir)
249
250 if not KPF.instance(parent=self.parent).is_provisioner_available(kspec):
251 raise NoSuchKernel(kernel_name)
252
253 return kspec
254
255 def _find_spec_directory(self, kernel_name: str) -> str | None:
256 """Find the resource directory of a named kernel spec"""
257 for kernel_dir in [kd for kd in self.kernel_dirs if os.path.isdir(kd)]:
258 files = os.listdir(kernel_dir)
259 for f in files:
260 path = pjoin(kernel_dir, f)
261 if f.lower() == kernel_name and _is_kernel_dir(path):
262 return path
263
264 if kernel_name == NATIVE_KERNEL_NAME:
265 try:
266 from ipykernel.kernelspec import RESOURCES
267 except ImportError:
268 pass
269 else:
270 return RESOURCES
271 return None
272
273 def get_kernel_spec(self, kernel_name: str) -> KernelSpec:
274 """Returns a :class:`KernelSpec` instance for the given kernel_name.
275
276 Raises :exc:`NoSuchKernel` if the given kernel name is not found.
277 """
278 if not _is_valid_kernel_name(kernel_name):
279 self.log.warning(
280 f"Kernelspec name {kernel_name} is invalid: {_kernel_name_description}"
281 )
282
283 resource_dir = self._find_spec_directory(kernel_name.lower())
284 if resource_dir is None:
285 raise NoSuchKernel(kernel_name)
286
287 return self._get_kernel_spec_by_name(kernel_name, resource_dir)
288
289 def get_all_specs(self) -> dict[str, t.Any]:
290 """Returns a dict mapping kernel names to kernelspecs.
291
292 Returns a dict of the form::
293
294 {
295 'kernel_name': {
296 'resource_dir': '/path/to/kernel_name',
297 'spec': {"the spec itself": ...}
298 },
299 ...
300 }
301 """
302 d = self.find_kernel_specs()
303 res = {}
304 for kname, resource_dir in d.items():
305 try:
306 if self.__class__ is KernelSpecManager:
307 spec = self._get_kernel_spec_by_name(kname, resource_dir)
308 else:
309 # avoid calling private methods in subclasses,
310 # which may have overridden find_kernel_specs
311 # and get_kernel_spec, but not the newer get_all_specs
312 spec = self.get_kernel_spec(kname)
313
314 res[kname] = {"resource_dir": resource_dir, "spec": spec.to_dict()}
315 except NoSuchKernel:
316 pass # The appropriate warning has already been logged
317 except Exception:
318 self.log.warning("Error loading kernelspec %r", kname, exc_info=True)
319 return res
320
321 def remove_kernel_spec(self, name: str) -> str:
322 """Remove a kernel spec directory by name.
323
324 Returns the path that was deleted.
325 """
326 save_native = self.ensure_native_kernel
327 try:
328 self.ensure_native_kernel = False
329 specs = self.find_kernel_specs()
330 finally:
331 self.ensure_native_kernel = save_native
332 spec_dir = specs[name]
333 self.log.debug("Removing %s", spec_dir)
334 if os.path.islink(spec_dir):
335 os.remove(spec_dir)
336 else:
337 shutil.rmtree(spec_dir)
338 return spec_dir
339
340 def _get_destination_dir(
341 self, kernel_name: str, user: bool = False, prefix: str | None = None
342 ) -> str:
343 if user:
344 return os.path.join(self.user_kernel_dir, kernel_name)
345 elif prefix:
346 return os.path.join(os.path.abspath(prefix), "share", "jupyter", "kernels", kernel_name)
347 else:
348 return os.path.join(SYSTEM_JUPYTER_PATH[0], "kernels", kernel_name)
349
350 def install_kernel_spec(
351 self,
352 source_dir: str,
353 kernel_name: str | None = None,
354 user: bool = False,
355 replace: bool | None = None,
356 prefix: str | None = None,
357 ) -> str:
358 """Install a kernel spec by copying its directory.
359
360 If ``kernel_name`` is not given, the basename of ``source_dir`` will
361 be used.
362
363 If ``user`` is False, it will attempt to install into the systemwide
364 kernel registry. If the process does not have appropriate permissions,
365 an :exc:`OSError` will be raised.
366
367 If ``prefix`` is given, the kernelspec will be installed to
368 PREFIX/share/jupyter/kernels/KERNEL_NAME. This can be sys.prefix
369 for installation inside virtual or conda envs.
370 """
371 source_dir = source_dir.rstrip("/\\")
372 if not kernel_name:
373 kernel_name = os.path.basename(source_dir)
374 kernel_name = kernel_name.lower()
375 if not _is_valid_kernel_name(kernel_name):
376 msg = f"Invalid kernel name {kernel_name!r}. {_kernel_name_description}"
377 raise ValueError(msg)
378
379 if user and prefix:
380 msg = "Can't specify both user and prefix. Please choose one or the other."
381 raise ValueError(msg)
382
383 if replace is not None:
384 warnings.warn(
385 "replace is ignored. Installing a kernelspec always replaces an existing "
386 "installation",
387 DeprecationWarning,
388 stacklevel=2,
389 )
390
391 destination = self._get_destination_dir(kernel_name, user=user, prefix=prefix)
392 self.log.debug("Installing kernelspec in %s", destination)
393
394 kernel_dir = os.path.dirname(destination)
395 if kernel_dir not in self.kernel_dirs:
396 self.log.warning(
397 "Installing to %s, which is not in %s. The kernelspec may not be found.",
398 kernel_dir,
399 self.kernel_dirs,
400 )
401
402 if os.path.isdir(destination):
403 self.log.info("Removing existing kernelspec in %s", destination)
404 shutil.rmtree(destination)
405
406 shutil.copytree(source_dir, destination)
407 self.log.info("Installed kernelspec %s in %s", kernel_name, destination)
408 return destination
409
410 def install_native_kernel_spec(self, user: bool = False) -> None:
411 """DEPRECATED: Use ipykernel.kernelspec.install"""
412 warnings.warn(
413 "install_native_kernel_spec is deprecated. Use ipykernel.kernelspec import install.",
414 stacklevel=2,
415 )
416 from ipykernel.kernelspec import install
417
418 install(self, user=user)
419
420
421def find_kernel_specs() -> dict[str, str]:
422 """Returns a dict mapping kernel names to resource directories."""
423 return KernelSpecManager().find_kernel_specs()
424
425
426def get_kernel_spec(kernel_name: str) -> KernelSpec:
427 """Returns a :class:`KernelSpec` instance for the given kernel_name.
428
429 Raises KeyError if the given kernel name is not found.
430 """
431 return KernelSpecManager().get_kernel_spec(kernel_name)
432
433
434def install_kernel_spec(
435 source_dir: str,
436 kernel_name: str | None = None,
437 user: bool = False,
438 replace: bool | None = False,
439 prefix: str | None = None,
440) -> str:
441 """Install a kernel spec in a given directory."""
442 return KernelSpecManager().install_kernel_spec(source_dir, kernel_name, user, replace, prefix)
443
444
445install_kernel_spec.__doc__ = KernelSpecManager.install_kernel_spec.__doc__
446
447
448def install_native_kernel_spec(user: bool = False) -> None:
449 """Install the native kernel spec."""
450 KernelSpecManager().install_native_kernel_spec(user=user)
451
452
453install_native_kernel_spec.__doc__ = KernelSpecManager.install_native_kernel_spec.__doc__