Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_core/paths.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Path utility functions."""
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
6# Derived from IPython.utils.path, which is
7# Copyright (c) IPython Development Team.
8# Distributed under the terms of the Modified BSD License.
9from __future__ import annotations
11import errno
12import os
13import site
14import stat
15import sys
16import tempfile
17import warnings
18from collections.abc import Iterator
19from contextlib import contextmanager
20from pathlib import Path
21from typing import Any, overload
23import platformdirs
25pjoin = os.path.join
27# Capitalize Jupyter in paths only on Windows and MacOS (when not in Homebrew)
28if sys.platform == "win32" or (
29 sys.platform == "darwin" and not sys.prefix.startswith("/opt/homebrew")
30):
31 APPNAME = "Jupyter"
32else:
33 APPNAME = "jupyter"
35# UF_HIDDEN is a stat flag not defined in the stat module.
36# It is used by BSD to indicate hidden files.
37UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768)
40@overload
41def envset(name: str, default: bool = False) -> bool: ...
44@overload
45def envset(name: str, default: None) -> bool | None: ...
48def envset(name: str, default: bool | None = False) -> bool | None:
49 """Return the boolean value of a given environment variable.
51 An environment variable is considered set if it is assigned to a value
52 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive)
54 If the environment variable is not defined, the default value is returned.
55 """
56 if name not in os.environ:
57 return default
59 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"]
62def use_platform_dirs() -> bool:
63 """Determine if platformdirs should be used for system-specific paths.
65 The default is False.
66 """
67 return envset("JUPYTER_PLATFORM_DIRS", False)
70def get_home_dir() -> str:
71 """Get the real path of the home directory"""
72 homedir = Path("~").expanduser()
73 # Next line will make things work even when /home/ is a symlink to
74 # /usr/home as it is on FreeBSD, for example
75 return str(Path(homedir).resolve())
78_dtemps: dict[str, str] = {}
81def _do_i_own(path: str) -> bool:
82 """Return whether the current user owns the given path"""
83 p = Path(path).resolve()
85 # walk up to first existing parent
86 while not p.exists() and p != p.parent:
87 p = p.parent
89 # simplest check: owner by name
90 # not always implemented or available
91 try:
92 return p.owner() == os.getlogin()
93 except Exception: # noqa: S110
94 pass
96 if hasattr(os, "geteuid"):
97 try:
98 st = p.stat()
99 return st.st_uid == os.geteuid()
100 except (NotImplementedError, OSError):
101 # geteuid not always implemented
102 pass
104 # no ownership checks worked, check write access
105 return os.access(p, os.W_OK)
108def prefer_environment_over_user() -> bool:
109 """Determine if environment-level paths should take precedence over user-level paths."""
110 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value
111 if "JUPYTER_PREFER_ENV_PATH" in os.environ:
112 return envset("JUPYTER_PREFER_ENV_PATH")
114 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def)
115 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix):
116 return True
118 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True
119 if (
120 "CONDA_PREFIX" in os.environ
121 and sys.prefix.startswith(os.environ["CONDA_PREFIX"])
122 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base"
123 and _do_i_own(sys.prefix)
124 ):
125 return True
127 return False
130def _mkdtemp_once(name: str) -> str:
131 """Make or reuse a temporary directory.
133 If this is called with the same name in the same process, it will return
134 the same directory.
135 """
136 try:
137 return _dtemps[name]
138 except KeyError:
139 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-")
140 return d
143def jupyter_config_dir() -> str:
144 """Get the Jupyter config directory for this platform and user.
146 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate
147 directory for the platform.
148 """
150 env = os.environ
151 if env.get("JUPYTER_NO_CONFIG"):
152 return _mkdtemp_once("jupyter-clean-cfg")
154 if env.get("JUPYTER_CONFIG_DIR"):
155 return env["JUPYTER_CONFIG_DIR"]
157 if use_platform_dirs():
158 return platformdirs.user_config_dir(APPNAME, appauthor=False)
160 home_dir = get_home_dir()
161 return pjoin(home_dir, ".jupyter")
164def jupyter_data_dir() -> str:
165 """Get the config directory for Jupyter data files for this platform and user.
167 These are non-transient, non-configuration files.
169 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path.
170 """
171 env = os.environ
173 if env.get("JUPYTER_DATA_DIR"):
174 return env["JUPYTER_DATA_DIR"]
176 if use_platform_dirs():
177 return platformdirs.user_data_dir(APPNAME, appauthor=False)
179 home = get_home_dir()
181 if sys.platform == "darwin":
182 return str(Path(home, "Library", "Jupyter"))
183 # Bug in mypy which thinks it's unreachable: https://github.com/python/mypy/issues/10773
184 if sys.platform == "win32":
185 appdata = os.environ.get("APPDATA", None)
186 if appdata:
187 return str(Path(appdata, "jupyter").resolve())
188 return pjoin(jupyter_config_dir(), "data")
189 # Linux, non-OS X Unix, AIX, etc.
190 # Bug in mypy which thinks it's unreachable: https://github.com/python/mypy/issues/10773
191 xdg = env.get("XDG_DATA_HOME", None)
192 if not xdg:
193 xdg = pjoin(home, ".local", "share")
194 return pjoin(xdg, "jupyter")
197def jupyter_runtime_dir() -> str:
198 """Return the runtime dir for transient jupyter files.
200 Returns JUPYTER_RUNTIME_DIR if defined.
202 The default is now (data_dir)/runtime on all platforms;
203 we no longer use XDG_RUNTIME_DIR after various problems.
204 """
205 env = os.environ
207 if env.get("JUPYTER_RUNTIME_DIR"):
208 return env["JUPYTER_RUNTIME_DIR"]
210 return pjoin(jupyter_data_dir(), "runtime")
213# %PROGRAMDATA% is not safe by default, require opt-in to trust it
214_use_programdata: bool = envset("JUPYTER_USE_PROGRAMDATA")
215# _win_programdata is a path str if we're using it, None otherwise
216_win_programdata: str | None = None
217if os.name == "nt" and _use_programdata:
218 _win_programdata = os.environ.get("PROGRAMDATA", None)
221if use_platform_dirs():
222 if os.name == "nt" and not _use_programdata:
223 # default PROGRAMDATA used by site_* is not safe by default on Windows
224 SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))]
225 else:
226 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir(
227 APPNAME, appauthor=False, multipath=True
228 ).split(os.pathsep)
229else: # noqa: PLR5501
230 # default dirs
231 if os.name == "nt":
232 # PROGRAMDATA is not defined by default on XP, and not safe by default
233 if _win_programdata:
234 SYSTEM_JUPYTER_PATH = [pjoin(_win_programdata, "jupyter")]
235 else:
236 SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))]
237 else:
238 SYSTEM_JUPYTER_PATH = [
239 "/usr/local/share/jupyter",
240 "/usr/share/jupyter",
241 ]
243ENV_JUPYTER_PATH: list[str] = [str(Path(sys.prefix, "share", "jupyter"))]
246def jupyter_path(*subdirs: str) -> list[str]:
247 """Return a list of directories to search for data files.
249 There are four sources of paths to search:
251 - $JUPYTER_PATH environment variable (always highest priority)
252 - user directories (e.g. ~/.local/share/jupyter)
253 - environment directories (e.g. {sys.prefix}/share/jupyter)
254 - system-wide paths (e.g. /usr/local/share/jupyter)
256 JUPYTER_PATH environment variable has highest priority, if defined,
257 and is purely additive.
259 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level
260 directories will have priority over user-level directories.
261 You can also set JUPYTER_PREFER_ENV_PATH=0 to explicitly prefer user directories.
262 If Jupyter detects that you are in a virtualenv or conda environment,
263 environment paths are also preferred to user paths,
264 otherwise user paths are preferred to environment paths.
266 If the Python site.ENABLE_USER_SITE variable is True, we also add the
267 appropriate Python user site subdirectory to the user-level directories.
269 Finally, system-wide directories, such as `/usr/local/share/jupyter` are searched.
271 If ``*subdirs`` are given, that subdirectory will be added to each element.
274 .. versionchanged:: 5.8
276 On Windows, %PROGRAMDATA% will be used as a system-wide path only if
277 the JUPYTER_USE_PROGRAMDATA env is set.
278 By default, there is no default system-wide path on Windows and the env path
279 is used instead.
281 Examples:
283 >>> jupyter_path()
284 ['~/.local/jupyter', '/usr/local/share/jupyter']
285 >>> jupyter_path('kernels')
286 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels']
287 """
289 paths: list[str] = []
291 # highest priority is explicit environment variable
292 if os.environ.get("JUPYTER_PATH"):
293 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep))
295 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
296 user = [jupyter_data_dir()]
297 if site.ENABLE_USER_SITE:
298 # Check if site.getuserbase() exists to be compatible with virtualenv,
299 # which often does not have this method.
300 userbase: str | None
301 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
303 if userbase:
304 userdir = str(Path(userbase, "share", "jupyter"))
305 if userdir not in user:
306 user.append(userdir)
308 # Windows usually doesn't have a 'system' prefix,
309 # so 'system' and 'env' are the same
310 # make sure that env can still be preferred in this case
311 if ENV_JUPYTER_PATH == SYSTEM_JUPYTER_PATH:
312 env = ENV_JUPYTER_PATH
313 else:
314 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH]
316 if prefer_environment_over_user():
317 paths.extend(env)
318 paths.extend(user)
319 else:
320 paths.extend(user)
321 paths.extend(env)
323 # finally, add system paths (can overlap with env, so avoid duplicates)
324 for _path in SYSTEM_JUPYTER_PATH:
325 if _path not in paths:
326 paths.append(_path)
328 # add subdir, if requested
329 if subdirs:
330 paths = [pjoin(p, *subdirs) for p in paths]
331 return paths
334ENV_CONFIG_PATH: list[str] = [str(Path(sys.prefix, "etc", "jupyter"))]
336if use_platform_dirs():
337 if os.name == "nt" and not _use_programdata:
338 # default PROGRAMDATA is not safe by default on Windows
339 # use ENV to avoid an empty list, since some may assume this is non-empty
340 SYSTEM_CONFIG_PATH = ENV_CONFIG_PATH[:]
341 else:
342 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir(
343 APPNAME, appauthor=False, multipath=True
344 ).split(os.pathsep)
345elif os.name == "nt":
346 # PROGRAMDATA is not defined by default on XP, and not safe by default
347 # but make sure it's not empty
348 if _win_programdata:
349 SYSTEM_CONFIG_PATH = [str(Path(_win_programdata, "jupyter"))]
350 else:
351 SYSTEM_CONFIG_PATH = ENV_CONFIG_PATH[:]
352else:
353 SYSTEM_CONFIG_PATH = [
354 "/usr/local/etc/jupyter",
355 "/etc/jupyter",
356 ]
359def jupyter_config_path() -> list[str]:
360 """Return the search path for Jupyter config files as a list.
362 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the
363 environment-level directories will have priority over user-level
364 directories.
366 If the Python site.ENABLE_USER_SITE variable is True, we also add the
367 appropriate Python user site subdirectory to the user-level directories.
369 Finally, system-wide directories such as `/usr/local/etc/jupyter` are searched.
372 .. versionchanged:: 5.8
374 On Windows, %PROGRAMDATA% will be used as a system-wide path only if
375 the JUPYTER_USE_PROGRAMDATA env is set.
376 By default, there is no system-wide config path on Windows.
378 Examples:
380 >>> jupyter_config_path()
381 ['~/.jupyter', '~/.local/etc/jupyter', '/usr/local/etc/jupyter', '/etc/jupyter']
383 """
384 if os.environ.get("JUPYTER_NO_CONFIG"):
385 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set.
386 return [jupyter_config_dir()]
388 paths: list[str] = []
390 # highest priority is explicit environment variable
391 if os.environ.get("JUPYTER_CONFIG_PATH"):
392 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep))
394 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
395 user = [jupyter_config_dir()]
396 if site.ENABLE_USER_SITE:
397 userbase: str | None
398 # Check if site.getuserbase() exists to be compatible with virtualenv,
399 # which often does not have this method.
400 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
402 if userbase:
403 userdir = str(Path(userbase, "etc", "jupyter"))
404 if userdir not in user:
405 user.append(userdir)
407 # Windows usually doesn't have a 'system' prefix,
408 # so 'system' and 'env' are the same
409 # make sure that env can still be preferred in this case
410 if ENV_CONFIG_PATH == SYSTEM_CONFIG_PATH:
411 env = ENV_CONFIG_PATH
412 else:
413 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH]
415 if prefer_environment_over_user():
416 paths.extend(env)
417 paths.extend(user)
418 else:
419 paths.extend(user)
420 paths.extend(env)
422 # Finally, system path
423 if ENV_CONFIG_PATH != SYSTEM_CONFIG_PATH:
424 paths.extend(SYSTEM_CONFIG_PATH)
425 return paths
428def exists(path: str) -> bool:
429 """Replacement for `os.path.exists` which works for host mapped volumes
430 on Windows containers
431 """
432 try:
433 os.lstat(path)
434 except OSError:
435 return False
436 return True
439def is_file_hidden_win(abs_path: str | Path, stat_res: Any | None = None) -> bool:
440 """Is a file hidden?
442 This only checks the file itself; it should be called in combination with
443 checking the directory containing the file.
445 Use is_hidden() instead to check the file and its parent directories.
447 Parameters
448 ----------
449 abs_path : unicode
450 The absolute path to check.
451 stat_res : os.stat_result, optional
452 The result of calling stat() on abs_path. If not passed, this function
453 will call stat() internally.
454 """
455 abs_path = Path(abs_path)
456 if abs_path.name.startswith("."):
457 return True
459 if stat_res is None:
460 try:
461 stat_res = Path(abs_path).stat()
462 except OSError as e:
463 if e.errno == errno.ENOENT:
464 return False
465 raise
467 try:
468 if (
469 stat_res.st_file_attributes # type:ignore[union-attr]
470 & stat.FILE_ATTRIBUTE_HIDDEN # type:ignore[attr-defined]
471 ):
472 return True
473 except AttributeError:
474 # allow AttributeError on PyPy for Windows
475 # 'stat_result' object has no attribute 'st_file_attributes'
476 # https://foss.heptapod.net/pypy/pypy/-/issues/3469
477 warnings.warn(
478 "hidden files are not detectable on this system, so no file will be marked as hidden.",
479 stacklevel=2,
480 )
482 return False
485def is_file_hidden_posix(abs_path: str | Path, stat_res: Any | None = None) -> bool:
486 """Is a file hidden?
488 This only checks the file itself; it should be called in combination with
489 checking the directory containing the file.
491 Use is_hidden() instead to check the file and its parent directories.
493 Parameters
494 ----------
495 abs_path : unicode
496 The absolute path to check.
497 stat_res : os.stat_result, optional
498 The result of calling stat() on abs_path. If not passed, this function
499 will call stat() internally.
500 """
501 abs_path = Path(abs_path)
502 if abs_path.name.startswith("."):
503 return True
505 if stat_res is None or stat.S_ISLNK(stat_res.st_mode):
506 try:
507 stat_res = abs_path.stat()
508 except OSError as e:
509 if e.errno == errno.ENOENT:
510 return False
511 raise
513 # check that dirs can be listed
514 if stat.S_ISDIR(stat_res.st_mode): # noqa: SIM102
515 # use x-access, not actual listing, in case of slow/large listings
516 if not os.access(abs_path, os.X_OK | os.R_OK):
517 return True
519 # check UF_HIDDEN
520 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN:
521 return True
523 return False
526if sys.platform == "win32":
527 is_file_hidden = is_file_hidden_win
528else:
529 is_file_hidden = is_file_hidden_posix
532def is_hidden(abs_path: str | Path, abs_root: str | Path = "") -> bool:
533 """Is a file hidden or contained in a hidden directory?
535 This will start with the rightmost path element and work backwards to the
536 given root to see if a path is hidden or in a hidden directory. Hidden is
537 determined by either name starting with '.' or the UF_HIDDEN flag as
538 reported by stat.
540 If abs_path is the same directory as abs_root, it will be visible even if
541 that is a hidden folder. This only checks the visibility of files
542 and directories *within* abs_root.
544 Parameters
545 ----------
546 abs_path : str or Path
547 The absolute path to check for hidden directories.
548 abs_root : str or Path
549 The absolute path of the root directory in which hidden directories
550 should be checked for.
551 """
552 abs_path = Path(os.path.normpath(abs_path))
553 if abs_root:
554 abs_root = Path(os.path.normpath(abs_root))
555 else:
556 abs_root = list(abs_path.parents)[-1]
558 if abs_path == abs_root:
559 # root itself is never hidden
560 return False
562 # check that arguments are valid
563 if not abs_path.is_absolute():
564 _msg = f"{abs_path=} is not absolute. abs_path must be absolute."
565 raise ValueError(_msg)
566 if not abs_root.is_absolute():
567 _msg = f"{abs_root=} is not absolute. abs_root must be absolute."
568 raise ValueError(_msg)
569 if not abs_path.is_relative_to(abs_root):
570 _msg = (
571 f"{abs_path=} is not a subdirectory of {abs_root=}. abs_path must be within abs_root."
572 )
573 raise ValueError(_msg)
575 if is_file_hidden(abs_path):
576 return True
578 relative_path = abs_path.relative_to(abs_root)
579 if any(part.startswith(".") for part in relative_path.parts):
580 return True
582 # check UF_HIDDEN on any location up to root.
583 # is_file_hidden() already checked the file, so start from its parent dir
584 for parent in abs_path.parents:
585 if not parent.exists():
586 continue
587 if parent == abs_root:
588 break
589 try:
590 # may fail on Windows junctions
591 st = parent.lstat()
592 except OSError:
593 return True
594 if getattr(st, "st_flags", 0) & UF_HIDDEN:
595 return True
597 return False
600def win32_restrict_file_to_user(fname: str) -> None:
601 """Secure a windows file to read-only access for the user.
602 Follows guidance from win32 library creator:
603 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
605 This method should be executed against an already generated file which
606 has no secrets written to it yet.
608 Parameters
609 ----------
611 fname : unicode
612 The path to the file to secure
613 """
614 try:
615 import win32api # noqa: PLC0415
616 except ImportError:
617 return _win32_restrict_file_to_user_ctypes(fname)
619 import ntsecuritycon as con # noqa: PLC0415
620 import win32security # noqa: PLC0415
622 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone")
623 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid)
624 user, _domain, _type = win32security.LookupAccountName(
625 "", win32api.GetUserNameEx(win32api.NameSamCompatible)
626 )
628 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION)
630 dacl = win32security.ACL()
631 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone)
632 dacl.AddAccessAllowedAce(
633 win32security.ACL_REVISION,
634 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE,
635 user,
636 )
637 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins)
639 sd.SetSecurityDescriptorDacl(1, dacl, 0)
640 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd)
641 return None
644def _win32_restrict_file_to_user_ctypes(fname: str) -> None:
645 """Secure a windows file to read-only access for the user.
647 Follows guidance from win32 library creator:
648 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
650 This method should be executed against an already generated file which
651 has no secrets written to it yet.
653 Parameters
654 ----------
656 fname : unicode
657 The path to the file to secure
658 """
659 import ctypes # noqa: PLC0415
660 from ctypes import wintypes # noqa: PLC0415
662 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined]
663 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined]
665 NameSamCompatible = 2
666 WinBuiltinAdministratorsSid = 26
667 DACL_SECURITY_INFORMATION = 4
668 ACL_REVISION = 2
669 ERROR_INSUFFICIENT_BUFFER = 122
670 ERROR_MORE_DATA = 234
672 SYNCHRONIZE = 0x100000
673 DELETE = 0x00010000
674 STANDARD_RIGHTS_REQUIRED = 0xF0000
675 STANDARD_RIGHTS_READ = 0x20000
676 STANDARD_RIGHTS_WRITE = 0x20000
677 FILE_READ_DATA = 1
678 FILE_READ_EA = 8
679 FILE_READ_ATTRIBUTES = 128
680 FILE_WRITE_DATA = 2
681 FILE_APPEND_DATA = 4
682 FILE_WRITE_EA = 16
683 FILE_WRITE_ATTRIBUTES = 256
684 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF
685 FILE_GENERIC_READ = (
686 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE
687 )
688 FILE_GENERIC_WRITE = (
689 STANDARD_RIGHTS_WRITE
690 | FILE_WRITE_DATA
691 | FILE_WRITE_ATTRIBUTES
692 | FILE_WRITE_EA
693 | FILE_APPEND_DATA
694 | SYNCHRONIZE
695 )
697 class ACL(ctypes.Structure):
698 _fields_ = [
699 ("AclRevision", wintypes.BYTE),
700 ("Sbz1", wintypes.BYTE),
701 ("AclSize", wintypes.WORD),
702 ("AceCount", wintypes.WORD),
703 ("Sbz2", wintypes.WORD),
704 ]
706 PSID = ctypes.c_void_p
707 PACL = ctypes.POINTER(ACL)
708 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE)
710 def _nonzero_success(result: int, func: Any, args: Any) -> Any: # noqa: ARG001
711 if not result:
712 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined]
713 return args
715 secur32.GetUserNameExW.errcheck = _nonzero_success
716 secur32.GetUserNameExW.restype = wintypes.BOOL
717 secur32.GetUserNameExW.argtypes = (
718 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat
719 wintypes.LPWSTR, # LPWSTR lpNameBuffer,
720 wintypes.PULONG, # PULONG nSize
721 )
723 advapi32.CreateWellKnownSid.errcheck = _nonzero_success
724 advapi32.CreateWellKnownSid.restype = wintypes.BOOL
725 advapi32.CreateWellKnownSid.argtypes = (
726 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType
727 PSID, # PSID DomainSid
728 PSID, # PSID pSid
729 wintypes.PDWORD, # DWORD *cbSid
730 )
732 advapi32.LookupAccountNameW.errcheck = _nonzero_success
733 advapi32.LookupAccountNameW.restype = wintypes.BOOL
734 advapi32.LookupAccountNameW.argtypes = (
735 wintypes.LPWSTR, # LPCWSTR lpSystemName
736 wintypes.LPWSTR, # LPCWSTR lpAccountName
737 PSID, # PSID Sid
738 wintypes.LPDWORD, # LPDWORD cbSid
739 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName
740 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName
741 wintypes.LPDWORD, # PSID_NAME_USE peUse
742 )
744 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success
745 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL
746 advapi32.AddAccessAllowedAce.argtypes = (
747 PACL, # PACL pAcl
748 wintypes.DWORD, # DWORD dwAceRevision
749 wintypes.DWORD, # DWORD AccessMask
750 PSID, # PSID pSid
751 )
753 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success
754 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL
755 advapi32.SetSecurityDescriptorDacl.argtypes = (
756 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
757 wintypes.BOOL, # BOOL bDaclPresent
758 PACL, # PACL pDacl
759 wintypes.BOOL, # BOOL bDaclDefaulted
760 )
762 advapi32.GetFileSecurityW.errcheck = _nonzero_success
763 advapi32.GetFileSecurityW.restype = wintypes.BOOL
764 advapi32.GetFileSecurityW.argtypes = (
765 wintypes.LPCWSTR, # LPCWSTR lpFileName
766 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation
767 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
768 wintypes.DWORD, # DWORD nLength
769 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded
770 )
772 advapi32.SetFileSecurityW.errcheck = _nonzero_success
773 advapi32.SetFileSecurityW.restype = wintypes.BOOL
774 advapi32.SetFileSecurityW.argtypes = (
775 wintypes.LPCWSTR, # LPCWSTR lpFileName
776 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation
777 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
778 )
780 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success
781 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL
782 advapi32.MakeAbsoluteSD.argtypes = (
783 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
784 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
785 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize
786 PACL, # PACL pDacl
787 wintypes.LPDWORD, # LPDWORD lpdwDaclSize
788 PACL, # PACL pSacl
789 wintypes.LPDWORD, # LPDWORD lpdwSaclSize
790 PSID, # PSID pOwner
791 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize
792 PSID, # PSID pPrimaryGroup
793 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize
794 )
796 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success
797 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL
798 advapi32.MakeSelfRelativeSD.argtypes = (
799 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
800 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
801 wintypes.LPDWORD, # LPDWORD lpdwBufferLength
802 )
804 advapi32.InitializeAcl.errcheck = _nonzero_success
805 advapi32.InitializeAcl.restype = wintypes.BOOL
806 advapi32.InitializeAcl.argtypes = (
807 PACL, # PACL pAcl,
808 wintypes.DWORD, # DWORD nAclLength,
809 wintypes.DWORD, # DWORD dwAclRevision
810 )
812 def CreateWellKnownSid(WellKnownSidType: Any) -> Any:
813 # return a SID for predefined aliases
814 pSid = (ctypes.c_char * 1)()
815 cbSid = wintypes.DWORD()
816 try:
817 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
818 except OSError as e:
819 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
820 raise
821 pSid = (ctypes.c_char * cbSid.value)()
822 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
823 return pSid[:]
825 def GetUserNameEx(NameFormat: Any) -> Any:
826 # return the user or other security principal associated with
827 # the calling thread
828 nSize = ctypes.pointer(ctypes.c_ulong(0))
829 try:
830 secur32.GetUserNameExW(NameFormat, None, nSize)
831 except OSError as e:
832 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined]
833 raise
834 if not nSize.contents.value:
835 return None
836 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value)
837 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize)
838 return lpNameBuffer.value
840 def LookupAccountName(lpSystemName: Any, lpAccountName: Any) -> Any:
841 # return a security identifier (SID) for an account on a system
842 # and the name of the domain on which the account was found
843 cbSid = wintypes.DWORD(0)
844 cchReferencedDomainName = wintypes.DWORD(0)
845 peUse = wintypes.DWORD(0)
846 try:
847 advapi32.LookupAccountNameW(
848 lpSystemName,
849 lpAccountName,
850 None,
851 ctypes.byref(cbSid),
852 None,
853 ctypes.byref(cchReferencedDomainName),
854 ctypes.byref(peUse),
855 )
856 except OSError as e:
857 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
858 raise
859 Sid = ctypes.create_unicode_buffer("", cbSid.value)
860 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID)
861 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1)
862 success = advapi32.LookupAccountNameW(
863 lpSystemName,
864 lpAccountName,
865 pSid,
866 ctypes.byref(cbSid),
867 lpReferencedDomainName,
868 ctypes.byref(cchReferencedDomainName),
869 ctypes.byref(peUse),
870 )
871 if not success:
872 raise ctypes.WinError() # type:ignore[attr-defined]
873 return pSid, lpReferencedDomainName.value, peUse.value
875 def AddAccessAllowedAce(pAcl: Any, dwAceRevision: Any, AccessMask: Any, pSid: Any) -> Any:
876 # add an access-allowed access control entry (ACE)
877 # to an access control list (ACL)
878 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid)
880 def GetFileSecurity(lpFileName: Any, RequestedInformation: Any) -> Any:
881 # return information about the security of a file or directory
882 nLength = wintypes.DWORD(0)
883 try:
884 advapi32.GetFileSecurityW(
885 lpFileName,
886 RequestedInformation,
887 None,
888 0,
889 ctypes.byref(nLength),
890 )
891 except OSError as e:
892 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
893 raise
894 if not nLength.value:
895 return None
896 pSecurityDescriptor = (wintypes.BYTE * nLength.value)()
897 advapi32.GetFileSecurityW(
898 lpFileName,
899 RequestedInformation,
900 pSecurityDescriptor,
901 nLength,
902 ctypes.byref(nLength),
903 )
904 return pSecurityDescriptor
906 def SetFileSecurity(
907 lpFileName: Any, RequestedInformation: Any, pSecurityDescriptor: Any
908 ) -> Any:
909 # set the security of a file or directory object
910 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor)
912 def SetSecurityDescriptorDacl(
913 pSecurityDescriptor: Any, bDaclPresent: Any, pDacl: Any, bDaclDefaulted: Any
914 ) -> Any:
915 # set information in a discretionary access control list (DACL)
916 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted)
918 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor: Any) -> Any:
919 # return a security descriptor in absolute format
920 # by using a security descriptor in self-relative format as a template
921 pAbsoluteSecurityDescriptor = None
922 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0)
923 pDacl = None
924 lpdwDaclSize = wintypes.DWORD(0)
925 pSacl = None
926 lpdwSaclSize = wintypes.DWORD(0)
927 pOwner = None
928 lpdwOwnerSize = wintypes.DWORD(0)
929 pPrimaryGroup = None
930 lpdwPrimaryGroupSize = wintypes.DWORD(0)
931 try:
932 advapi32.MakeAbsoluteSD(
933 pSelfRelativeSecurityDescriptor,
934 pAbsoluteSecurityDescriptor,
935 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
936 pDacl,
937 ctypes.byref(lpdwDaclSize),
938 pSacl,
939 ctypes.byref(lpdwSaclSize),
940 pOwner,
941 ctypes.byref(lpdwOwnerSize),
942 pPrimaryGroup,
943 ctypes.byref(lpdwPrimaryGroupSize),
944 )
945 except OSError as e:
946 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
947 raise
948 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)()
949 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)()
950 pDacl = ctypes.cast(pDaclData, PACL).contents
951 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)()
952 pSacl = ctypes.cast(pSaclData, PACL).contents
953 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)()
954 pOwner = ctypes.cast(pOwnerData, PSID)
955 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)()
956 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID)
957 advapi32.MakeAbsoluteSD(
958 pSelfRelativeSecurityDescriptor,
959 pAbsoluteSecurityDescriptor,
960 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
961 pDacl,
962 ctypes.byref(lpdwDaclSize),
963 pSacl,
964 ctypes.byref(lpdwSaclSize),
965 pOwner,
966 lpdwOwnerSize,
967 pPrimaryGroup,
968 ctypes.byref(lpdwPrimaryGroupSize),
969 )
970 return pAbsoluteSecurityDescriptor
972 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor: Any) -> Any:
973 # return a security descriptor in self-relative format
974 # by using a security descriptor in absolute format as a template
975 pSelfRelativeSecurityDescriptor = None
976 lpdwBufferLength = wintypes.DWORD(0)
977 try:
978 advapi32.MakeSelfRelativeSD(
979 pAbsoluteSecurityDescriptor,
980 pSelfRelativeSecurityDescriptor,
981 ctypes.byref(lpdwBufferLength),
982 )
983 except OSError as e:
984 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
985 raise
986 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)()
987 advapi32.MakeSelfRelativeSD(
988 pAbsoluteSecurityDescriptor,
989 pSelfRelativeSecurityDescriptor,
990 ctypes.byref(lpdwBufferLength),
991 )
992 return pSelfRelativeSecurityDescriptor
994 def NewAcl() -> Any:
995 # return a new, initialized ACL (access control list) structure
996 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ?
997 acl_data = ctypes.create_string_buffer(nAclLength)
998 pAcl = ctypes.cast(acl_data, PACL).contents
999 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION)
1000 return pAcl
1002 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid)
1003 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0]
1005 Acl = NewAcl()
1006 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins)
1007 AddAccessAllowedAce(
1008 Acl,
1009 ACL_REVISION,
1010 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE,
1011 SidUser,
1012 )
1014 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION)
1015 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD)
1016 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0)
1017 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD)
1019 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD)
1022def get_file_mode(fname: str) -> int:
1023 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner.
1025 Parameters
1026 ----------
1028 fname : unicode
1029 The path to the file to get mode from
1031 """
1032 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we
1033 # should tolerate the execute bit on the file's owner when validating permissions - thus
1034 # the missing least significant bit on the third octal digit. In addition, we also tolerate
1035 # the sticky bit being set, so the lsb from the fourth octal digit is also removed.
1036 return (
1037 stat.S_IMODE(Path(fname).stat().st_mode) & 0o6677
1038 ) # Use 4 octal digits since S_IMODE does the same
1041allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1")
1044@contextmanager
1045def secure_write(fname: str, binary: bool = False) -> Iterator[Any]:
1046 """Opens a file in the most restricted pattern available for
1047 writing content. This limits the file mode to `0o0600` and yields
1048 the resulting opened filed handle.
1050 Parameters
1051 ----------
1053 fname : unicode
1054 The path to the file to write
1056 binary: boolean
1057 Indicates that the file is binary
1058 """
1059 mode = "wb" if binary else "w"
1060 encoding = None if binary else "utf-8"
1061 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
1062 try:
1063 Path(fname).unlink()
1064 except OSError:
1065 # Skip any issues with the file not existing
1066 pass
1068 if os.name == "nt":
1069 if allow_insecure_writes:
1070 # Mounted file systems can have a number of failure modes inside this block.
1071 # For windows machines in insecure mode we simply skip this to avoid failures :/
1072 issue_insecure_write_warning()
1073 else:
1074 # Python on windows does not respect the group and public bits for chmod, so we need
1075 # to take additional steps to secure the contents.
1076 # Touch file preemptively to avoid editing permissions in open files in Windows
1077 fd = os.open(fname, open_flag, 0o0600)
1078 os.close(fd)
1079 open_flag = os.O_WRONLY | os.O_TRUNC
1080 win32_restrict_file_to_user(fname)
1082 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f:
1083 if os.name != "nt":
1084 # Enforce that the file got the requested permissions before writing
1085 file_mode = get_file_mode(fname)
1086 if file_mode != 0o0600:
1087 if allow_insecure_writes:
1088 issue_insecure_write_warning()
1089 else:
1090 msg = (
1091 f"Permissions assignment failed for secure file: '{fname}'."
1092 f" Got '{oct(file_mode)}' instead of '0o0600'."
1093 )
1094 raise RuntimeError(msg)
1095 yield f
1098def issue_insecure_write_warning() -> None:
1099 """Issue an insecure write warning."""
1101 def format_warning(msg: str, *args: Any, **kwargs: Any) -> str: # noqa: ARG001
1102 return str(msg) + "\n"
1104 warnings.formatwarning = format_warning # type:ignore[assignment]
1105 warnings.warn(
1106 "WARNING: Insecure writes have been enabled via environment variable "
1107 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the "
1108 "variable or set its value to 'False'.",
1109 stacklevel=2,
1110 )