1"""Tools for managing kernel specs"""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
6
7import json
8import os
9import re
10import shutil
11import typing as t
12import warnings
13
14from jupyter_core.paths import SYSTEM_JUPYTER_PATH, jupyter_data_dir, jupyter_path
15from traitlets import Bool, CaselessStrEnum, Dict, HasTraits, List, Set, Type, Unicode, observe
16from traitlets.config import LoggingConfigurable
17
18from .provisioning import KernelProvisionerFactory as KPF # noqa
19
20pjoin = os.path.join
21
22NATIVE_KERNEL_NAME = "python3"
23
24
25class KernelSpec(HasTraits):
26 """A kernel spec model object."""
27
28 argv: List[str] = List()
29 name = Unicode()
30 mimetype = Unicode()
31 display_name = Unicode()
32 language = Unicode()
33 kernel_protocol_version = Unicode()
34 env = Dict()
35 resource_dir = Unicode()
36 interrupt_mode = CaselessStrEnum(["message", "signal"], default_value="signal")
37 metadata = Dict()
38
39 @classmethod
40 def from_resource_dir(cls: type[KernelSpec], resource_dir: str) -> KernelSpec:
41 """Create a KernelSpec object by reading kernel.json
42
43 Pass the path to the *directory* containing kernel.json.
44 """
45 kernel_file = pjoin(resource_dir, "kernel.json")
46 with open(kernel_file, encoding="utf-8") as f:
47 kernel_dict = json.load(f)
48 return cls(resource_dir=resource_dir, **kernel_dict)
49
50 def to_dict(self) -> dict[str, t.Any]:
51 """Convert the kernel spec to a dict."""
52 d = {
53 "argv": self.argv,
54 "env": self.env,
55 "display_name": self.display_name,
56 "language": self.language,
57 "interrupt_mode": self.interrupt_mode,
58 "metadata": self.metadata,
59 "kernel_protocol_version": self.kernel_protocol_version,
60 }
61
62 return d
63
64 def to_json(self) -> str:
65 """Serialise this kernelspec to a JSON object.
66
67 Returns a string.
68 """
69 return json.dumps(self.to_dict())
70
71
72_kernel_name_pat = re.compile(r"^[a-z0-9._\-]+$", re.IGNORECASE)
73
74
75def _is_valid_kernel_name(name: str) -> t.Any:
76 """Check that a kernel name is valid."""
77 # quote is not unicode-safe on Python 2
78 return _kernel_name_pat.match(name)
79
80
81_kernel_name_description = (
82 "Kernel names can only contain ASCII letters and numbers and these separators:"
83 " - . _ (hyphen, period, and underscore)."
84)
85
86
87def _is_kernel_dir(path: str) -> bool:
88 """Is ``path`` a kernel directory?"""
89 return os.path.isdir(path) and os.path.isfile(pjoin(path, "kernel.json"))
90
91
92def _list_kernels_in(dir: str | None) -> dict[str, str]:
93 """Return a mapping of kernel names to resource directories from dir.
94
95 If dir is None or does not exist, returns an empty dict.
96 """
97 if dir is None or not os.path.isdir(dir):
98 return {}
99 kernels = {}
100 for f in os.listdir(dir):
101 path = pjoin(dir, f)
102 if not _is_kernel_dir(path):
103 continue
104 key = f.lower()
105 if not _is_valid_kernel_name(key):
106 warnings.warn(
107 f"Invalid kernelspec directory name ({_kernel_name_description}): {path}",
108 stacklevel=3,
109 )
110 kernels[key] = path
111 return kernels
112
113
114class NoSuchKernel(KeyError): # noqa
115 """An error raised when there is no kernel of a give name."""
116
117 def __init__(self, name: str) -> None:
118 """Initialize the error."""
119 self.name = name
120
121 def __str__(self) -> str:
122 return f"No such kernel named {self.name}"
123
124
125class KernelSpecManager(LoggingConfigurable):
126 """A manager for kernel specs."""
127
128 kernel_spec_class = Type(
129 KernelSpec,
130 config=True,
131 help="""The kernel spec class. This is configurable to allow
132 subclassing of the KernelSpecManager for customized behavior.
133 """,
134 )
135
136 ensure_native_kernel = Bool(
137 True,
138 config=True,
139 help="""If there is no Python kernelspec registered and the IPython
140 kernel is available, ensure it is added to the spec list.
141 """,
142 )
143
144 data_dir = Unicode()
145
146 def _data_dir_default(self) -> str:
147 return jupyter_data_dir()
148
149 user_kernel_dir = Unicode()
150
151 def _user_kernel_dir_default(self) -> str:
152 return pjoin(self.data_dir, "kernels")
153
154 whitelist = Set(
155 config=True,
156 help="""Deprecated, use `KernelSpecManager.allowed_kernelspecs`
157 """,
158 )
159 allowed_kernelspecs = Set(
160 config=True,
161 help="""List of allowed kernel names.
162
163 By default, all installed kernels are allowed.
164 """,
165 )
166 kernel_dirs: List[str] = List(
167 help="List of kernel directories to search. Later ones take priority over earlier."
168 )
169
170 _deprecated_aliases = {
171 "whitelist": ("allowed_kernelspecs", "7.0"),
172 }
173
174 # Method copied from
175 # https://github.com/jupyterhub/jupyterhub/blob/d1a85e53dccfc7b1dd81b0c1985d158cc6b61820/jupyterhub/auth.py#L143-L161
176 @observe(*list(_deprecated_aliases))
177 def _deprecated_trait(self, change: t.Any) -> None:
178 """observer for deprecated traits"""
179 old_attr = change.name
180 new_attr, version = self._deprecated_aliases[old_attr]
181 new_value = getattr(self, new_attr)
182 if new_value != change.new:
183 # only warn if different
184 # protects backward-compatible config from warnings
185 # if they set the same value under both names
186 self.log.warning(
187 f"{self.__class__.__name__}.{old_attr} is deprecated in jupyter_client "
188 f"{version}, use {self.__class__.__name__}.{new_attr} instead"
189 )
190 setattr(self, new_attr, change.new)
191
192 def _kernel_dirs_default(self) -> list[str]:
193 dirs = jupyter_path("kernels")
194 # At some point, we should stop adding .ipython/kernels to the path,
195 # but the cost to keeping it is very small.
196 try:
197 # this should always be valid on IPython 3+
198 from IPython.paths import get_ipython_dir
199
200 dirs.append(os.path.join(get_ipython_dir(), "kernels"))
201 except ModuleNotFoundError:
202 pass
203 return dirs
204
205 def find_kernel_specs(self) -> dict[str, str]:
206 """Returns a dict mapping kernel names to resource directories."""
207 d = {}
208 for kernel_dir in self.kernel_dirs:
209 kernels = _list_kernels_in(kernel_dir)
210 for kname, spec in kernels.items():
211 if kname not in d:
212 self.log.debug("Found kernel %s in %s", kname, kernel_dir)
213 d[kname] = spec
214
215 if self.ensure_native_kernel and NATIVE_KERNEL_NAME not in d:
216 try:
217 from ipykernel.kernelspec import RESOURCES
218
219 self.log.debug(
220 "Native kernel (%s) available from %s",
221 NATIVE_KERNEL_NAME,
222 RESOURCES,
223 )
224 d[NATIVE_KERNEL_NAME] = RESOURCES
225 except ImportError:
226 self.log.warning("Native kernel (%s) is not available", NATIVE_KERNEL_NAME)
227
228 if self.allowed_kernelspecs:
229 # filter if there's an allow list
230 d = {name: spec for name, spec in d.items() if name in self.allowed_kernelspecs}
231 return d
232 # TODO: Caching?
233
234 def _get_kernel_spec_by_name(self, kernel_name: str, resource_dir: str) -> KernelSpec:
235 """Returns a :class:`KernelSpec` instance for a given kernel_name
236 and resource_dir.
237 """
238 kspec = None
239 if kernel_name == NATIVE_KERNEL_NAME:
240 try:
241 from ipykernel.kernelspec import RESOURCES, get_kernel_dict
242 except ImportError:
243 # It should be impossible to reach this, but let's play it safe
244 pass
245 else:
246 if resource_dir == RESOURCES:
247 kdict = get_kernel_dict()
248 kspec = self.kernel_spec_class(resource_dir=resource_dir, **kdict)
249 if not kspec:
250 kspec = self.kernel_spec_class.from_resource_dir(resource_dir)
251
252 if not KPF.instance(parent=self.parent).is_provisioner_available(kspec):
253 raise NoSuchKernel(kernel_name)
254
255 return kspec
256
257 def _find_spec_directory(self, kernel_name: str) -> str | None:
258 """Find the resource directory of a named kernel spec"""
259 for kernel_dir in [kd for kd in self.kernel_dirs if os.path.isdir(kd)]:
260 files = os.listdir(kernel_dir)
261 for f in files:
262 path = pjoin(kernel_dir, f)
263 if f.lower() == kernel_name and _is_kernel_dir(path):
264 return path
265
266 if kernel_name == NATIVE_KERNEL_NAME:
267 try:
268 from ipykernel.kernelspec import RESOURCES
269 except ImportError:
270 pass
271 else:
272 return RESOURCES
273 return None
274
275 def get_kernel_spec(self, kernel_name: str) -> KernelSpec:
276 """Returns a :class:`KernelSpec` instance for the given kernel_name.
277
278 Raises :exc:`NoSuchKernel` if the given kernel name is not found.
279 """
280 if not _is_valid_kernel_name(kernel_name):
281 self.log.warning(
282 f"Kernelspec name {kernel_name} is invalid: {_kernel_name_description}"
283 )
284
285 resource_dir = self._find_spec_directory(kernel_name.lower())
286 if resource_dir is None:
287 raise NoSuchKernel(kernel_name)
288
289 return self._get_kernel_spec_by_name(kernel_name, resource_dir)
290
291 def get_all_specs(self) -> dict[str, t.Any]:
292 """Returns a dict mapping kernel names to kernelspecs.
293
294 Returns a dict of the form::
295
296 {
297 'kernel_name': {
298 'resource_dir': '/path/to/kernel_name',
299 'spec': {"the spec itself": ...}
300 },
301 ...
302 }
303 """
304 d = self.find_kernel_specs()
305 res = {}
306 for kname, resource_dir in d.items():
307 try:
308 if self.__class__ is KernelSpecManager:
309 spec = self._get_kernel_spec_by_name(kname, resource_dir)
310 else:
311 # avoid calling private methods in subclasses,
312 # which may have overridden find_kernel_specs
313 # and get_kernel_spec, but not the newer get_all_specs
314 spec = self.get_kernel_spec(kname)
315
316 res[kname] = {"resource_dir": resource_dir, "spec": spec.to_dict()}
317 except NoSuchKernel:
318 pass # The appropriate warning has already been logged
319 except Exception:
320 self.log.warning("Error loading kernelspec %r", kname, exc_info=True)
321 return res
322
323 def remove_kernel_spec(self, name: str) -> str:
324 """Remove a kernel spec directory by name.
325
326 Returns the path that was deleted.
327 """
328 save_native = self.ensure_native_kernel
329 try:
330 self.ensure_native_kernel = False
331 specs = self.find_kernel_specs()
332 finally:
333 self.ensure_native_kernel = save_native
334 spec_dir = specs[name]
335 self.log.debug("Removing %s", spec_dir)
336 if os.path.islink(spec_dir):
337 os.remove(spec_dir)
338 else:
339 shutil.rmtree(spec_dir)
340 return spec_dir
341
342 def _get_destination_dir(
343 self, kernel_name: str, user: bool = False, prefix: str | None = None
344 ) -> str:
345 if user:
346 return os.path.join(self.user_kernel_dir, kernel_name)
347 elif prefix:
348 return os.path.join(os.path.abspath(prefix), "share", "jupyter", "kernels", kernel_name)
349 else:
350 return os.path.join(SYSTEM_JUPYTER_PATH[0], "kernels", kernel_name)
351
352 def install_kernel_spec(
353 self,
354 source_dir: str,
355 kernel_name: str | None = None,
356 user: bool = False,
357 replace: bool | None = None,
358 prefix: str | None = None,
359 ) -> str:
360 """Install a kernel spec by copying its directory.
361
362 If ``kernel_name`` is not given, the basename of ``source_dir`` will
363 be used.
364
365 If ``user`` is False, it will attempt to install into the systemwide
366 kernel registry. If the process does not have appropriate permissions,
367 an :exc:`OSError` will be raised.
368
369 If ``prefix`` is given, the kernelspec will be installed to
370 PREFIX/share/jupyter/kernels/KERNEL_NAME. This can be sys.prefix
371 for installation inside virtual or conda envs.
372 """
373 source_dir = source_dir.rstrip("/\\")
374 if not kernel_name:
375 kernel_name = os.path.basename(source_dir)
376 kernel_name = kernel_name.lower()
377 if not _is_valid_kernel_name(kernel_name):
378 msg = f"Invalid kernel name {kernel_name!r}. {_kernel_name_description}"
379 raise ValueError(msg)
380
381 if user and prefix:
382 msg = "Can't specify both user and prefix. Please choose one or the other."
383 raise ValueError(msg)
384
385 if replace is not None:
386 warnings.warn(
387 "replace is ignored. Installing a kernelspec always replaces an existing "
388 "installation",
389 DeprecationWarning,
390 stacklevel=2,
391 )
392
393 destination = self._get_destination_dir(kernel_name, user=user, prefix=prefix)
394 self.log.debug("Installing kernelspec in %s", destination)
395
396 kernel_dir = os.path.dirname(destination)
397 if kernel_dir not in self.kernel_dirs:
398 self.log.warning(
399 "Installing to %s, which is not in %s. The kernelspec may not be found.",
400 kernel_dir,
401 self.kernel_dirs,
402 )
403
404 if os.path.isdir(destination):
405 self.log.info("Removing existing kernelspec in %s", destination)
406 shutil.rmtree(destination)
407
408 shutil.copytree(source_dir, destination)
409 self.log.info("Installed kernelspec %s in %s", kernel_name, destination)
410 return destination
411
412 def install_native_kernel_spec(self, user: bool = False) -> None:
413 """DEPRECATED: Use ipykernel.kernelspec.install"""
414 warnings.warn(
415 "install_native_kernel_spec is deprecated. Use ipykernel.kernelspec import install.",
416 stacklevel=2,
417 )
418 from ipykernel.kernelspec import install
419
420 install(self, user=user)
421
422
423def find_kernel_specs() -> dict[str, str]:
424 """Returns a dict mapping kernel names to resource directories."""
425 return KernelSpecManager().find_kernel_specs()
426
427
428def get_kernel_spec(kernel_name: str) -> KernelSpec:
429 """Returns a :class:`KernelSpec` instance for the given kernel_name.
430
431 Raises KeyError if the given kernel name is not found.
432 """
433 return KernelSpecManager().get_kernel_spec(kernel_name)
434
435
436def install_kernel_spec(
437 source_dir: str,
438 kernel_name: str | None = None,
439 user: bool = False,
440 replace: bool | None = False,
441 prefix: str | None = None,
442) -> str:
443 """Install a kernel spec in a given directory."""
444 return KernelSpecManager().install_kernel_spec(source_dir, kernel_name, user, replace, prefix)
445
446
447install_kernel_spec.__doc__ = KernelSpecManager.install_kernel_spec.__doc__
448
449
450def install_native_kernel_spec(user: bool = False) -> None:
451 """Install the native kernel spec."""
452 KernelSpecManager().install_native_kernel_spec(user=user)
453
454
455install_native_kernel_spec.__doc__ = KernelSpecManager.install_native_kernel_spec.__doc__