1"""Tools for managing kernel specs"""
2# Copyright (c) Jupyter Development Team.
3# Distributed under the terms of the Modified BSD License.
4from __future__ import annotations
5
6import json
7import os
8import re
9import shutil
10import typing as t
11import warnings
12
13from jupyter_core.paths import SYSTEM_JUPYTER_PATH, jupyter_data_dir, jupyter_path
14from traitlets import Bool, CaselessStrEnum, Dict, HasTraits, List, Set, Type, Unicode, observe
15from traitlets.config import LoggingConfigurable
16
17from .provisioning import KernelProvisionerFactory as KPF # noqa
18
19pjoin = os.path.join
20
21NATIVE_KERNEL_NAME = "python3"
22
23
24class KernelSpec(HasTraits):
25 """A kernel spec model object."""
26
27 argv: List[str] = List()
28 name = Unicode()
29 mimetype = Unicode()
30 display_name = Unicode()
31 language = Unicode()
32 env = Dict()
33 resource_dir = Unicode()
34 interrupt_mode = CaselessStrEnum(["message", "signal"], default_value="signal")
35 metadata = Dict()
36
37 @classmethod
38 def from_resource_dir(cls: type[KernelSpec], resource_dir: str) -> KernelSpec:
39 """Create a KernelSpec object by reading kernel.json
40
41 Pass the path to the *directory* containing kernel.json.
42 """
43 kernel_file = pjoin(resource_dir, "kernel.json")
44 with open(kernel_file, encoding="utf-8") as f:
45 kernel_dict = json.load(f)
46 return cls(resource_dir=resource_dir, **kernel_dict)
47
48 def to_dict(self) -> dict[str, t.Any]:
49 """Convert the kernel spec to a dict."""
50 d = {
51 "argv": self.argv,
52 "env": self.env,
53 "display_name": self.display_name,
54 "language": self.language,
55 "interrupt_mode": self.interrupt_mode,
56 "metadata": self.metadata,
57 }
58
59 return d
60
61 def to_json(self) -> str:
62 """Serialise this kernelspec to a JSON object.
63
64 Returns a string.
65 """
66 return json.dumps(self.to_dict())
67
68
69_kernel_name_pat = re.compile(r"^[a-z0-9._\-]+$", re.IGNORECASE)
70
71
72def _is_valid_kernel_name(name: str) -> t.Any:
73 """Check that a kernel name is valid."""
74 # quote is not unicode-safe on Python 2
75 return _kernel_name_pat.match(name)
76
77
78_kernel_name_description = (
79 "Kernel names can only contain ASCII letters and numbers and these separators:"
80 " - . _ (hyphen, period, and underscore)."
81)
82
83
84def _is_kernel_dir(path: str) -> bool:
85 """Is ``path`` a kernel directory?"""
86 return os.path.isdir(path) and os.path.isfile(pjoin(path, "kernel.json"))
87
88
89def _list_kernels_in(dir: str | None) -> dict[str, str]:
90 """Return a mapping of kernel names to resource directories from dir.
91
92 If dir is None or does not exist, returns an empty dict.
93 """
94 if dir is None or not os.path.isdir(dir):
95 return {}
96 kernels = {}
97 for f in os.listdir(dir):
98 path = pjoin(dir, f)
99 if not _is_kernel_dir(path):
100 continue
101 key = f.lower()
102 if not _is_valid_kernel_name(key):
103 warnings.warn(
104 f"Invalid kernelspec directory name ({_kernel_name_description}): {path}",
105 stacklevel=3,
106 )
107 kernels[key] = path
108 return kernels
109
110
111class NoSuchKernel(KeyError): # noqa
112 """An error raised when there is no kernel of a give name."""
113
114 def __init__(self, name: str) -> None:
115 """Initialize the error."""
116 self.name = name
117
118 def __str__(self) -> str:
119 return f"No such kernel named {self.name}"
120
121
122class KernelSpecManager(LoggingConfigurable):
123 """A manager for kernel specs."""
124
125 kernel_spec_class = Type(
126 KernelSpec,
127 config=True,
128 help="""The kernel spec class. This is configurable to allow
129 subclassing of the KernelSpecManager for customized behavior.
130 """,
131 )
132
133 ensure_native_kernel = Bool(
134 True,
135 config=True,
136 help="""If there is no Python kernelspec registered and the IPython
137 kernel is available, ensure it is added to the spec list.
138 """,
139 )
140
141 data_dir = Unicode()
142
143 def _data_dir_default(self) -> str:
144 return jupyter_data_dir()
145
146 user_kernel_dir = Unicode()
147
148 def _user_kernel_dir_default(self) -> str:
149 return pjoin(self.data_dir, "kernels")
150
151 whitelist = Set(
152 config=True,
153 help="""Deprecated, use `KernelSpecManager.allowed_kernelspecs`
154 """,
155 )
156 allowed_kernelspecs = Set(
157 config=True,
158 help="""List of allowed kernel names.
159
160 By default, all installed kernels are allowed.
161 """,
162 )
163 kernel_dirs: List[str] = List(
164 help="List of kernel directories to search. Later ones take priority over earlier."
165 )
166
167 _deprecated_aliases = {
168 "whitelist": ("allowed_kernelspecs", "7.0"),
169 }
170
171 # Method copied from
172 # https://github.com/jupyterhub/jupyterhub/blob/d1a85e53dccfc7b1dd81b0c1985d158cc6b61820/jupyterhub/auth.py#L143-L161
173 @observe(*list(_deprecated_aliases))
174 def _deprecated_trait(self, change: t.Any) -> None:
175 """observer for deprecated traits"""
176 old_attr = change.name
177 new_attr, version = self._deprecated_aliases[old_attr]
178 new_value = getattr(self, new_attr)
179 if new_value != change.new:
180 # only warn if different
181 # protects backward-compatible config from warnings
182 # if they set the same value under both names
183 self.log.warning(
184 f"{self.__class__.__name__}.{old_attr} is deprecated in jupyter_client "
185 f"{version}, use {self.__class__.__name__}.{new_attr} instead"
186 )
187 setattr(self, new_attr, change.new)
188
189 def _kernel_dirs_default(self) -> list[str]:
190 dirs = jupyter_path("kernels")
191 # At some point, we should stop adding .ipython/kernels to the path,
192 # but the cost to keeping it is very small.
193 try:
194 # this should always be valid on IPython 3+
195 from IPython.paths import get_ipython_dir
196
197 dirs.append(os.path.join(get_ipython_dir(), "kernels"))
198 except ModuleNotFoundError:
199 pass
200 return dirs
201
202 def find_kernel_specs(self) -> dict[str, str]:
203 """Returns a dict mapping kernel names to resource directories."""
204 d = {}
205 for kernel_dir in self.kernel_dirs:
206 kernels = _list_kernels_in(kernel_dir)
207 for kname, spec in kernels.items():
208 if kname not in d:
209 self.log.debug("Found kernel %s in %s", kname, kernel_dir)
210 d[kname] = spec
211
212 if self.ensure_native_kernel and NATIVE_KERNEL_NAME not in d:
213 try:
214 from ipykernel.kernelspec import RESOURCES
215
216 self.log.debug(
217 "Native kernel (%s) available from %s",
218 NATIVE_KERNEL_NAME,
219 RESOURCES,
220 )
221 d[NATIVE_KERNEL_NAME] = RESOURCES
222 except ImportError:
223 self.log.warning("Native kernel (%s) is not available", NATIVE_KERNEL_NAME)
224
225 if self.allowed_kernelspecs:
226 # filter if there's an allow list
227 d = {name: spec for name, spec in d.items() if name in self.allowed_kernelspecs}
228 return d
229 # TODO: Caching?
230
231 def _get_kernel_spec_by_name(self, kernel_name: str, resource_dir: str) -> KernelSpec:
232 """Returns a :class:`KernelSpec` instance for a given kernel_name
233 and resource_dir.
234 """
235 kspec = None
236 if kernel_name == NATIVE_KERNEL_NAME:
237 try:
238 from ipykernel.kernelspec import RESOURCES, get_kernel_dict
239 except ImportError:
240 # It should be impossible to reach this, but let's play it safe
241 pass
242 else:
243 if resource_dir == RESOURCES:
244 kdict = get_kernel_dict()
245 kspec = self.kernel_spec_class(resource_dir=resource_dir, **kdict)
246 if not kspec:
247 kspec = self.kernel_spec_class.from_resource_dir(resource_dir)
248
249 if not KPF.instance(parent=self.parent).is_provisioner_available(kspec):
250 raise NoSuchKernel(kernel_name)
251
252 return kspec
253
254 def _find_spec_directory(self, kernel_name: str) -> str | None:
255 """Find the resource directory of a named kernel spec"""
256 for kernel_dir in [kd for kd in self.kernel_dirs if os.path.isdir(kd)]:
257 files = os.listdir(kernel_dir)
258 for f in files:
259 path = pjoin(kernel_dir, f)
260 if f.lower() == kernel_name and _is_kernel_dir(path):
261 return path
262
263 if kernel_name == NATIVE_KERNEL_NAME:
264 try:
265 from ipykernel.kernelspec import RESOURCES
266 except ImportError:
267 pass
268 else:
269 return RESOURCES
270 return None
271
272 def get_kernel_spec(self, kernel_name: str) -> KernelSpec:
273 """Returns a :class:`KernelSpec` instance for the given kernel_name.
274
275 Raises :exc:`NoSuchKernel` if the given kernel name is not found.
276 """
277 if not _is_valid_kernel_name(kernel_name):
278 self.log.warning(
279 f"Kernelspec name {kernel_name} is invalid: {_kernel_name_description}"
280 )
281
282 resource_dir = self._find_spec_directory(kernel_name.lower())
283 if resource_dir is None:
284 self.log.warning("Kernelspec name %s cannot be found!", kernel_name)
285 raise NoSuchKernel(kernel_name)
286
287 return self._get_kernel_spec_by_name(kernel_name, resource_dir)
288
289 def get_all_specs(self) -> dict[str, t.Any]:
290 """Returns a dict mapping kernel names to kernelspecs.
291
292 Returns a dict of the form::
293
294 {
295 'kernel_name': {
296 'resource_dir': '/path/to/kernel_name',
297 'spec': {"the spec itself": ...}
298 },
299 ...
300 }
301 """
302 d = self.find_kernel_specs()
303 res = {}
304 for kname, resource_dir in d.items():
305 try:
306 if self.__class__ is KernelSpecManager:
307 spec = self._get_kernel_spec_by_name(kname, resource_dir)
308 else:
309 # avoid calling private methods in subclasses,
310 # which may have overridden find_kernel_specs
311 # and get_kernel_spec, but not the newer get_all_specs
312 spec = self.get_kernel_spec(kname)
313
314 res[kname] = {"resource_dir": resource_dir, "spec": spec.to_dict()}
315 except NoSuchKernel:
316 pass # The appropriate warning has already been logged
317 except Exception:
318 self.log.warning("Error loading kernelspec %r", kname, exc_info=True)
319 return res
320
321 def remove_kernel_spec(self, name: str) -> str:
322 """Remove a kernel spec directory by name.
323
324 Returns the path that was deleted.
325 """
326 save_native = self.ensure_native_kernel
327 try:
328 self.ensure_native_kernel = False
329 specs = self.find_kernel_specs()
330 finally:
331 self.ensure_native_kernel = save_native
332 spec_dir = specs[name]
333 self.log.debug("Removing %s", spec_dir)
334 if os.path.islink(spec_dir):
335 os.remove(spec_dir)
336 else:
337 shutil.rmtree(spec_dir)
338 return spec_dir
339
340 def _get_destination_dir(
341 self, kernel_name: str, user: bool = False, prefix: str | None = None
342 ) -> str:
343 if user:
344 return os.path.join(self.user_kernel_dir, kernel_name)
345 elif prefix:
346 return os.path.join(os.path.abspath(prefix), "share", "jupyter", "kernels", kernel_name)
347 else:
348 return os.path.join(SYSTEM_JUPYTER_PATH[0], "kernels", kernel_name)
349
350 def install_kernel_spec(
351 self,
352 source_dir: str,
353 kernel_name: str | None = None,
354 user: bool = False,
355 replace: bool | None = None,
356 prefix: str | None = None,
357 ) -> str:
358 """Install a kernel spec by copying its directory.
359
360 If ``kernel_name`` is not given, the basename of ``source_dir`` will
361 be used.
362
363 If ``user`` is False, it will attempt to install into the systemwide
364 kernel registry. If the process does not have appropriate permissions,
365 an :exc:`OSError` will be raised.
366
367 If ``prefix`` is given, the kernelspec will be installed to
368 PREFIX/share/jupyter/kernels/KERNEL_NAME. This can be sys.prefix
369 for installation inside virtual or conda envs.
370 """
371 source_dir = source_dir.rstrip("/\\")
372 if not kernel_name:
373 kernel_name = os.path.basename(source_dir)
374 kernel_name = kernel_name.lower()
375 if not _is_valid_kernel_name(kernel_name):
376 msg = f"Invalid kernel name {kernel_name!r}. {_kernel_name_description}"
377 raise ValueError(msg)
378
379 if user and prefix:
380 msg = "Can't specify both user and prefix. Please choose one or the other."
381 raise ValueError(msg)
382
383 if replace is not None:
384 warnings.warn(
385 "replace is ignored. Installing a kernelspec always replaces an existing "
386 "installation",
387 DeprecationWarning,
388 stacklevel=2,
389 )
390
391 destination = self._get_destination_dir(kernel_name, user=user, prefix=prefix)
392 self.log.debug("Installing kernelspec in %s", destination)
393
394 kernel_dir = os.path.dirname(destination)
395 if kernel_dir not in self.kernel_dirs:
396 self.log.warning(
397 "Installing to %s, which is not in %s. The kernelspec may not be found.",
398 kernel_dir,
399 self.kernel_dirs,
400 )
401
402 if os.path.isdir(destination):
403 self.log.info("Removing existing kernelspec in %s", destination)
404 shutil.rmtree(destination)
405
406 shutil.copytree(source_dir, destination)
407 self.log.info("Installed kernelspec %s in %s", kernel_name, destination)
408 return destination
409
410 def install_native_kernel_spec(self, user: bool = False) -> None:
411 """DEPRECATED: Use ipykernel.kernelspec.install"""
412 warnings.warn(
413 "install_native_kernel_spec is deprecated. Use ipykernel.kernelspec import install.",
414 stacklevel=2,
415 )
416 from ipykernel.kernelspec import install
417
418 install(self, user=user)
419
420
421def find_kernel_specs() -> dict[str, str]:
422 """Returns a dict mapping kernel names to resource directories."""
423 return KernelSpecManager().find_kernel_specs()
424
425
426def get_kernel_spec(kernel_name: str) -> KernelSpec:
427 """Returns a :class:`KernelSpec` instance for the given kernel_name.
428
429 Raises KeyError if the given kernel name is not found.
430 """
431 return KernelSpecManager().get_kernel_spec(kernel_name)
432
433
434def install_kernel_spec(
435 source_dir: str,
436 kernel_name: str | None = None,
437 user: bool = False,
438 replace: bool | None = False,
439 prefix: str | None = None,
440) -> str:
441 """Install a kernel spec in a given directory."""
442 return KernelSpecManager().install_kernel_spec(source_dir, kernel_name, user, replace, prefix)
443
444
445install_kernel_spec.__doc__ = KernelSpecManager.install_kernel_spec.__doc__
446
447
448def install_native_kernel_spec(user: bool = False) -> None:
449 """Install the native kernel spec."""
450 KernelSpecManager().install_native_kernel_spec(user=user)
451
452
453install_native_kernel_spec.__doc__ = KernelSpecManager.install_native_kernel_spec.__doc__