Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_core/paths.py: 12%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Path utility functions."""
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
6# Derived from IPython.utils.path, which is
7# Copyright (c) IPython Development Team.
8# Distributed under the terms of the Modified BSD License.
9from __future__ import annotations
11import errno
12import os
13import site
14import stat
15import sys
16import tempfile
17import warnings
18from contextlib import contextmanager
19from pathlib import Path
20from typing import Any, Iterator, Optional
22import platformdirs
24from .utils import deprecation
26pjoin = os.path.join
28# Capitalize Jupyter in paths only on Windows and MacOS (when not in Homebrew)
29if sys.platform == "win32" or (
30 sys.platform == "darwin" and not sys.prefix.startswith("/opt/homebrew")
31):
32 APPNAME = "Jupyter"
33else:
34 APPNAME = "jupyter"
36# UF_HIDDEN is a stat flag not defined in the stat module.
37# It is used by BSD to indicate hidden files.
38UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768)
41def envset(name: str, default: Optional[bool] = False) -> Optional[bool]:
42 """Return the boolean value of a given environment variable.
44 An environment variable is considered set if it is assigned to a value
45 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive)
47 If the environment variable is not defined, the default value is returned.
48 """
49 if name not in os.environ:
50 return default
52 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"]
55def use_platform_dirs() -> bool:
56 """Determine if platformdirs should be used for system-specific paths.
58 We plan for this to default to False in jupyter_core version 5 and to True
59 in jupyter_core version 6.
60 """
61 return envset("JUPYTER_PLATFORM_DIRS", False) # type:ignore[return-value]
64def get_home_dir() -> str:
65 """Get the real path of the home directory"""
66 homedir = Path("~").expanduser()
67 # Next line will make things work even when /home/ is a symlink to
68 # /usr/home as it is on FreeBSD, for example
69 return str(Path(homedir).resolve())
72_dtemps: dict[str, str] = {}
75def _do_i_own(path: str) -> bool:
76 """Return whether the current user owns the given path"""
77 p = Path(path).resolve()
79 # walk up to first existing parent
80 while not p.exists() and p != p.parent:
81 p = p.parent
83 # simplest check: owner by name
84 # not always implemented or available
85 try:
86 return p.owner() == os.getlogin()
87 except Exception: # noqa: S110
88 pass
90 if hasattr(os, "geteuid"):
91 try:
92 st = p.stat()
93 return st.st_uid == os.geteuid()
94 except (NotImplementedError, OSError):
95 # geteuid not always implemented
96 pass
98 # no ownership checks worked, check write access
99 return os.access(p, os.W_OK)
102def prefer_environment_over_user() -> bool:
103 """Determine if environment-level paths should take precedence over user-level paths."""
104 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value
105 if "JUPYTER_PREFER_ENV_PATH" in os.environ:
106 return envset("JUPYTER_PREFER_ENV_PATH") # type:ignore[return-value]
108 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def)
109 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix):
110 return True
112 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True
113 if (
114 "CONDA_PREFIX" in os.environ
115 and sys.prefix.startswith(os.environ["CONDA_PREFIX"])
116 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base"
117 and _do_i_own(sys.prefix)
118 ):
119 return True
121 return False
124def _mkdtemp_once(name: str) -> str:
125 """Make or reuse a temporary directory.
127 If this is called with the same name in the same process, it will return
128 the same directory.
129 """
130 try:
131 return _dtemps[name]
132 except KeyError:
133 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-")
134 return d
137def jupyter_config_dir() -> str:
138 """Get the Jupyter config directory for this platform and user.
140 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate
141 directory for the platform.
142 """
144 env = os.environ
145 if env.get("JUPYTER_NO_CONFIG"):
146 return _mkdtemp_once("jupyter-clean-cfg")
148 if env.get("JUPYTER_CONFIG_DIR"):
149 return env["JUPYTER_CONFIG_DIR"]
151 if use_platform_dirs():
152 return platformdirs.user_config_dir(APPNAME, appauthor=False)
154 home_dir = get_home_dir()
155 return pjoin(home_dir, ".jupyter")
158def jupyter_data_dir() -> str:
159 """Get the config directory for Jupyter data files for this platform and user.
161 These are non-transient, non-configuration files.
163 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path.
164 """
165 env = os.environ
167 if env.get("JUPYTER_DATA_DIR"):
168 return env["JUPYTER_DATA_DIR"]
170 if use_platform_dirs():
171 return platformdirs.user_data_dir(APPNAME, appauthor=False)
173 home = get_home_dir()
175 if sys.platform == "darwin":
176 return str(Path(home, "Library", "Jupyter"))
177 if sys.platform == "win32":
178 appdata = os.environ.get("APPDATA", None)
179 if appdata:
180 return str(Path(appdata, "jupyter").resolve())
181 return pjoin(jupyter_config_dir(), "data")
182 # Linux, non-OS X Unix, AIX, etc.
183 xdg = env.get("XDG_DATA_HOME", None)
184 if not xdg:
185 xdg = pjoin(home, ".local", "share")
186 return pjoin(xdg, "jupyter")
189def jupyter_runtime_dir() -> str:
190 """Return the runtime dir for transient jupyter files.
192 Returns JUPYTER_RUNTIME_DIR if defined.
194 The default is now (data_dir)/runtime on all platforms;
195 we no longer use XDG_RUNTIME_DIR after various problems.
196 """
197 env = os.environ
199 if env.get("JUPYTER_RUNTIME_DIR"):
200 return env["JUPYTER_RUNTIME_DIR"]
202 return pjoin(jupyter_data_dir(), "runtime")
205if use_platform_dirs():
206 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir(
207 APPNAME, appauthor=False, multipath=True
208 ).split(os.pathsep)
209else:
210 deprecation(
211 "Jupyter is migrating its paths to use standard platformdirs\n"
212 "given by the platformdirs library. To remove this warning and\n"
213 "see the appropriate new directories, set the environment variable\n"
214 "`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.\n"
215 "The use of platformdirs will be the default in `jupyter_core` v6"
216 )
217 if os.name == "nt":
218 programdata = os.environ.get("PROGRAMDATA", None)
219 if programdata:
220 SYSTEM_JUPYTER_PATH = [pjoin(programdata, "jupyter")]
221 else: # PROGRAMDATA is not defined by default on XP.
222 SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))]
223 else:
224 SYSTEM_JUPYTER_PATH = [
225 "/usr/local/share/jupyter",
226 "/usr/share/jupyter",
227 ]
229ENV_JUPYTER_PATH: list[str] = [str(Path(sys.prefix, "share", "jupyter"))]
232def jupyter_path(*subdirs: str) -> list[str]:
233 """Return a list of directories to search for data files
235 JUPYTER_PATH environment variable has highest priority.
237 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level
238 directories will have priority over user-level directories.
240 If the Python site.ENABLE_USER_SITE variable is True, we also add the
241 appropriate Python user site subdirectory to the user-level directories.
244 If ``*subdirs`` are given, that subdirectory will be added to each element.
246 Examples:
248 >>> jupyter_path()
249 ['~/.local/jupyter', '/usr/local/share/jupyter']
250 >>> jupyter_path('kernels')
251 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels']
252 """
254 paths: list[str] = []
256 # highest priority is explicit environment variable
257 if os.environ.get("JUPYTER_PATH"):
258 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep))
260 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
261 user = [jupyter_data_dir()]
262 if site.ENABLE_USER_SITE:
263 # Check if site.getuserbase() exists to be compatible with virtualenv,
264 # which often does not have this method.
265 userbase: Optional[str]
266 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
268 if userbase:
269 userdir = str(Path(userbase, "share", "jupyter"))
270 if userdir not in user:
271 user.append(userdir)
273 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH]
275 if prefer_environment_over_user():
276 paths.extend(env)
277 paths.extend(user)
278 else:
279 paths.extend(user)
280 paths.extend(env)
282 # finally, system
283 paths.extend(SYSTEM_JUPYTER_PATH)
285 # add subdir, if requested
286 if subdirs:
287 paths = [pjoin(p, *subdirs) for p in paths]
288 return paths
291if use_platform_dirs():
292 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir(
293 APPNAME, appauthor=False, multipath=True
294 ).split(os.pathsep)
295else:
296 if os.name == "nt":
297 programdata = os.environ.get("PROGRAMDATA", None)
298 if programdata: # noqa: SIM108
299 SYSTEM_CONFIG_PATH = [str(Path(programdata, "jupyter"))]
300 else: # PROGRAMDATA is not defined by default on XP.
301 SYSTEM_CONFIG_PATH = []
302 else:
303 SYSTEM_CONFIG_PATH = [
304 "/usr/local/etc/jupyter",
305 "/etc/jupyter",
306 ]
307ENV_CONFIG_PATH: list[str] = [str(Path(sys.prefix, "etc", "jupyter"))]
310def jupyter_config_path() -> list[str]:
311 """Return the search path for Jupyter config files as a list.
313 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the
314 environment-level directories will have priority over user-level
315 directories.
317 If the Python site.ENABLE_USER_SITE variable is True, we also add the
318 appropriate Python user site subdirectory to the user-level directories.
319 """
320 if os.environ.get("JUPYTER_NO_CONFIG"):
321 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set.
322 return [jupyter_config_dir()]
324 paths: list[str] = []
326 # highest priority is explicit environment variable
327 if os.environ.get("JUPYTER_CONFIG_PATH"):
328 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep))
330 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
331 user = [jupyter_config_dir()]
332 if site.ENABLE_USER_SITE:
333 userbase: Optional[str]
334 # Check if site.getuserbase() exists to be compatible with virtualenv,
335 # which often does not have this method.
336 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
338 if userbase:
339 userdir = str(Path(userbase, "etc", "jupyter"))
340 if userdir not in user:
341 user.append(userdir)
343 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH]
345 if prefer_environment_over_user():
346 paths.extend(env)
347 paths.extend(user)
348 else:
349 paths.extend(user)
350 paths.extend(env)
352 # Finally, system path
353 paths.extend(SYSTEM_CONFIG_PATH)
354 return paths
357def exists(path: str) -> bool:
358 """Replacement for `os.path.exists` which works for host mapped volumes
359 on Windows containers
360 """
361 try:
362 os.lstat(path)
363 except OSError:
364 return False
365 return True
368def is_file_hidden_win(abs_path: str, stat_res: Optional[Any] = None) -> bool:
369 """Is a file hidden?
371 This only checks the file itself; it should be called in combination with
372 checking the directory containing the file.
374 Use is_hidden() instead to check the file and its parent directories.
376 Parameters
377 ----------
378 abs_path : unicode
379 The absolute path to check.
380 stat_res : os.stat_result, optional
381 The result of calling stat() on abs_path. If not passed, this function
382 will call stat() internally.
383 """
384 if Path(abs_path).name.startswith("."):
385 return True
387 if stat_res is None:
388 try:
389 stat_res = Path(abs_path).stat()
390 except OSError as e:
391 if e.errno == errno.ENOENT:
392 return False
393 raise
395 try:
396 if (
397 stat_res.st_file_attributes # type:ignore[union-attr]
398 & stat.FILE_ATTRIBUTE_HIDDEN # type:ignore[attr-defined]
399 ):
400 return True
401 except AttributeError:
402 # allow AttributeError on PyPy for Windows
403 # 'stat_result' object has no attribute 'st_file_attributes'
404 # https://foss.heptapod.net/pypy/pypy/-/issues/3469
405 warnings.warn(
406 "hidden files are not detectable on this system, so no file will be marked as hidden.",
407 stacklevel=2,
408 )
410 return False
413def is_file_hidden_posix(abs_path: str, stat_res: Optional[Any] = None) -> bool:
414 """Is a file hidden?
416 This only checks the file itself; it should be called in combination with
417 checking the directory containing the file.
419 Use is_hidden() instead to check the file and its parent directories.
421 Parameters
422 ----------
423 abs_path : unicode
424 The absolute path to check.
425 stat_res : os.stat_result, optional
426 The result of calling stat() on abs_path. If not passed, this function
427 will call stat() internally.
428 """
429 if Path(abs_path).name.startswith("."):
430 return True
432 if stat_res is None or stat.S_ISLNK(stat_res.st_mode):
433 try:
434 stat_res = Path(abs_path).stat()
435 except OSError as e:
436 if e.errno == errno.ENOENT:
437 return False
438 raise
440 # check that dirs can be listed
441 if stat.S_ISDIR(stat_res.st_mode): # noqa: SIM102
442 # use x-access, not actual listing, in case of slow/large listings
443 if not os.access(abs_path, os.X_OK | os.R_OK):
444 return True
446 # check UF_HIDDEN
447 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN:
448 return True
450 return False
453if sys.platform == "win32":
454 is_file_hidden = is_file_hidden_win
455else:
456 is_file_hidden = is_file_hidden_posix
459def is_hidden(abs_path: str, abs_root: str = "") -> bool:
460 """Is a file hidden or contained in a hidden directory?
462 This will start with the rightmost path element and work backwards to the
463 given root to see if a path is hidden or in a hidden directory. Hidden is
464 determined by either name starting with '.' or the UF_HIDDEN flag as
465 reported by stat.
467 If abs_path is the same directory as abs_root, it will be visible even if
468 that is a hidden folder. This only checks the visibility of files
469 and directories *within* abs_root.
471 Parameters
472 ----------
473 abs_path : unicode
474 The absolute path to check for hidden directories.
475 abs_root : unicode
476 The absolute path of the root directory in which hidden directories
477 should be checked for.
478 """
479 abs_path = os.path.normpath(abs_path)
480 abs_root = os.path.normpath(abs_root)
482 if abs_path == abs_root:
483 return False
485 if is_file_hidden(abs_path):
486 return True
488 if not abs_root:
489 abs_root = abs_path.split(os.sep, 1)[0] + os.sep
490 inside_root = abs_path[len(abs_root) :]
491 if any(part.startswith(".") for part in Path(inside_root).parts):
492 return True
494 # check UF_HIDDEN on any location up to root.
495 # is_file_hidden() already checked the file, so start from its parent dir
496 path = str(Path(abs_path).parent)
497 while path and path.startswith(abs_root) and path != abs_root:
498 if not Path(path).exists():
499 path = str(Path(path).parent)
500 continue
501 try:
502 # may fail on Windows junctions
503 st = os.lstat(path)
504 except OSError:
505 return True
506 if getattr(st, "st_flags", 0) & UF_HIDDEN:
507 return True
508 path = str(Path(path).parent)
510 return False
513def win32_restrict_file_to_user(fname: str) -> None:
514 """Secure a windows file to read-only access for the user.
515 Follows guidance from win32 library creator:
516 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
518 This method should be executed against an already generated file which
519 has no secrets written to it yet.
521 Parameters
522 ----------
524 fname : unicode
525 The path to the file to secure
526 """
527 try:
528 import win32api
529 except ImportError:
530 return _win32_restrict_file_to_user_ctypes(fname)
532 import ntsecuritycon as con
533 import win32security
535 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone")
536 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid)
537 user, _domain, _type = win32security.LookupAccountName(
538 "", win32api.GetUserNameEx(win32api.NameSamCompatible)
539 )
541 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION)
543 dacl = win32security.ACL()
544 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone)
545 dacl.AddAccessAllowedAce(
546 win32security.ACL_REVISION,
547 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE,
548 user,
549 )
550 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins)
552 sd.SetSecurityDescriptorDacl(1, dacl, 0)
553 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd)
554 return None
557def _win32_restrict_file_to_user_ctypes(fname: str) -> None:
558 """Secure a windows file to read-only access for the user.
560 Follows guidance from win32 library creator:
561 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
563 This method should be executed against an already generated file which
564 has no secrets written to it yet.
566 Parameters
567 ----------
569 fname : unicode
570 The path to the file to secure
571 """
572 import ctypes
573 from ctypes import wintypes
575 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined]
576 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined]
578 NameSamCompatible = 2
579 WinBuiltinAdministratorsSid = 26
580 DACL_SECURITY_INFORMATION = 4
581 ACL_REVISION = 2
582 ERROR_INSUFFICIENT_BUFFER = 122
583 ERROR_MORE_DATA = 234
585 SYNCHRONIZE = 0x100000
586 DELETE = 0x00010000
587 STANDARD_RIGHTS_REQUIRED = 0xF0000
588 STANDARD_RIGHTS_READ = 0x20000
589 STANDARD_RIGHTS_WRITE = 0x20000
590 FILE_READ_DATA = 1
591 FILE_READ_EA = 8
592 FILE_READ_ATTRIBUTES = 128
593 FILE_WRITE_DATA = 2
594 FILE_APPEND_DATA = 4
595 FILE_WRITE_EA = 16
596 FILE_WRITE_ATTRIBUTES = 256
597 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF
598 FILE_GENERIC_READ = (
599 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE
600 )
601 FILE_GENERIC_WRITE = (
602 STANDARD_RIGHTS_WRITE
603 | FILE_WRITE_DATA
604 | FILE_WRITE_ATTRIBUTES
605 | FILE_WRITE_EA
606 | FILE_APPEND_DATA
607 | SYNCHRONIZE
608 )
610 class ACL(ctypes.Structure):
611 _fields_ = [
612 ("AclRevision", wintypes.BYTE),
613 ("Sbz1", wintypes.BYTE),
614 ("AclSize", wintypes.WORD),
615 ("AceCount", wintypes.WORD),
616 ("Sbz2", wintypes.WORD),
617 ]
619 PSID = ctypes.c_void_p
620 PACL = ctypes.POINTER(ACL)
621 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE)
623 def _nonzero_success(result: int, func: Any, args: Any) -> Any: # noqa: ARG001
624 if not result:
625 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined]
626 return args
628 secur32.GetUserNameExW.errcheck = _nonzero_success
629 secur32.GetUserNameExW.restype = wintypes.BOOL
630 secur32.GetUserNameExW.argtypes = (
631 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat
632 wintypes.LPWSTR, # LPWSTR lpNameBuffer,
633 wintypes.PULONG, # PULONG nSize
634 )
636 advapi32.CreateWellKnownSid.errcheck = _nonzero_success
637 advapi32.CreateWellKnownSid.restype = wintypes.BOOL
638 advapi32.CreateWellKnownSid.argtypes = (
639 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType
640 PSID, # PSID DomainSid
641 PSID, # PSID pSid
642 wintypes.PDWORD, # DWORD *cbSid
643 )
645 advapi32.LookupAccountNameW.errcheck = _nonzero_success
646 advapi32.LookupAccountNameW.restype = wintypes.BOOL
647 advapi32.LookupAccountNameW.argtypes = (
648 wintypes.LPWSTR, # LPCWSTR lpSystemName
649 wintypes.LPWSTR, # LPCWSTR lpAccountName
650 PSID, # PSID Sid
651 wintypes.LPDWORD, # LPDWORD cbSid
652 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName
653 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName
654 wintypes.LPDWORD, # PSID_NAME_USE peUse
655 )
657 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success
658 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL
659 advapi32.AddAccessAllowedAce.argtypes = (
660 PACL, # PACL pAcl
661 wintypes.DWORD, # DWORD dwAceRevision
662 wintypes.DWORD, # DWORD AccessMask
663 PSID, # PSID pSid
664 )
666 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success
667 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL
668 advapi32.SetSecurityDescriptorDacl.argtypes = (
669 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
670 wintypes.BOOL, # BOOL bDaclPresent
671 PACL, # PACL pDacl
672 wintypes.BOOL, # BOOL bDaclDefaulted
673 )
675 advapi32.GetFileSecurityW.errcheck = _nonzero_success
676 advapi32.GetFileSecurityW.restype = wintypes.BOOL
677 advapi32.GetFileSecurityW.argtypes = (
678 wintypes.LPCWSTR, # LPCWSTR lpFileName
679 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation
680 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
681 wintypes.DWORD, # DWORD nLength
682 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded
683 )
685 advapi32.SetFileSecurityW.errcheck = _nonzero_success
686 advapi32.SetFileSecurityW.restype = wintypes.BOOL
687 advapi32.SetFileSecurityW.argtypes = (
688 wintypes.LPCWSTR, # LPCWSTR lpFileName
689 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation
690 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
691 )
693 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success
694 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL
695 advapi32.MakeAbsoluteSD.argtypes = (
696 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
697 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
698 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize
699 PACL, # PACL pDacl
700 wintypes.LPDWORD, # LPDWORD lpdwDaclSize
701 PACL, # PACL pSacl
702 wintypes.LPDWORD, # LPDWORD lpdwSaclSize
703 PSID, # PSID pOwner
704 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize
705 PSID, # PSID pPrimaryGroup
706 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize
707 )
709 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success
710 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL
711 advapi32.MakeSelfRelativeSD.argtypes = (
712 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
713 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
714 wintypes.LPDWORD, # LPDWORD lpdwBufferLength
715 )
717 advapi32.InitializeAcl.errcheck = _nonzero_success
718 advapi32.InitializeAcl.restype = wintypes.BOOL
719 advapi32.InitializeAcl.argtypes = (
720 PACL, # PACL pAcl,
721 wintypes.DWORD, # DWORD nAclLength,
722 wintypes.DWORD, # DWORD dwAclRevision
723 )
725 def CreateWellKnownSid(WellKnownSidType: Any) -> Any:
726 # return a SID for predefined aliases
727 pSid = (ctypes.c_char * 1)()
728 cbSid = wintypes.DWORD()
729 try:
730 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
731 except OSError as e:
732 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
733 raise
734 pSid = (ctypes.c_char * cbSid.value)()
735 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
736 return pSid[:]
738 def GetUserNameEx(NameFormat: Any) -> Any:
739 # return the user or other security principal associated with
740 # the calling thread
741 nSize = ctypes.pointer(ctypes.c_ulong(0))
742 try:
743 secur32.GetUserNameExW(NameFormat, None, nSize)
744 except OSError as e:
745 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined]
746 raise
747 if not nSize.contents.value:
748 return None
749 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value)
750 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize)
751 return lpNameBuffer.value
753 def LookupAccountName(lpSystemName: Any, lpAccountName: Any) -> Any:
754 # return a security identifier (SID) for an account on a system
755 # and the name of the domain on which the account was found
756 cbSid = wintypes.DWORD(0)
757 cchReferencedDomainName = wintypes.DWORD(0)
758 peUse = wintypes.DWORD(0)
759 try:
760 advapi32.LookupAccountNameW(
761 lpSystemName,
762 lpAccountName,
763 None,
764 ctypes.byref(cbSid),
765 None,
766 ctypes.byref(cchReferencedDomainName),
767 ctypes.byref(peUse),
768 )
769 except OSError as e:
770 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
771 raise
772 Sid = ctypes.create_unicode_buffer("", cbSid.value)
773 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID)
774 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1)
775 success = advapi32.LookupAccountNameW(
776 lpSystemName,
777 lpAccountName,
778 pSid,
779 ctypes.byref(cbSid),
780 lpReferencedDomainName,
781 ctypes.byref(cchReferencedDomainName),
782 ctypes.byref(peUse),
783 )
784 if not success:
785 raise ctypes.WinError() # type:ignore[attr-defined]
786 return pSid, lpReferencedDomainName.value, peUse.value
788 def AddAccessAllowedAce(pAcl: Any, dwAceRevision: Any, AccessMask: Any, pSid: Any) -> Any:
789 # add an access-allowed access control entry (ACE)
790 # to an access control list (ACL)
791 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid)
793 def GetFileSecurity(lpFileName: Any, RequestedInformation: Any) -> Any:
794 # return information about the security of a file or directory
795 nLength = wintypes.DWORD(0)
796 try:
797 advapi32.GetFileSecurityW(
798 lpFileName,
799 RequestedInformation,
800 None,
801 0,
802 ctypes.byref(nLength),
803 )
804 except OSError as e:
805 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
806 raise
807 if not nLength.value:
808 return None
809 pSecurityDescriptor = (wintypes.BYTE * nLength.value)()
810 advapi32.GetFileSecurityW(
811 lpFileName,
812 RequestedInformation,
813 pSecurityDescriptor,
814 nLength,
815 ctypes.byref(nLength),
816 )
817 return pSecurityDescriptor
819 def SetFileSecurity(
820 lpFileName: Any, RequestedInformation: Any, pSecurityDescriptor: Any
821 ) -> Any:
822 # set the security of a file or directory object
823 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor)
825 def SetSecurityDescriptorDacl(
826 pSecurityDescriptor: Any, bDaclPresent: Any, pDacl: Any, bDaclDefaulted: Any
827 ) -> Any:
828 # set information in a discretionary access control list (DACL)
829 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted)
831 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor: Any) -> Any:
832 # return a security descriptor in absolute format
833 # by using a security descriptor in self-relative format as a template
834 pAbsoluteSecurityDescriptor = None
835 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0)
836 pDacl = None
837 lpdwDaclSize = wintypes.DWORD(0)
838 pSacl = None
839 lpdwSaclSize = wintypes.DWORD(0)
840 pOwner = None
841 lpdwOwnerSize = wintypes.DWORD(0)
842 pPrimaryGroup = None
843 lpdwPrimaryGroupSize = wintypes.DWORD(0)
844 try:
845 advapi32.MakeAbsoluteSD(
846 pSelfRelativeSecurityDescriptor,
847 pAbsoluteSecurityDescriptor,
848 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
849 pDacl,
850 ctypes.byref(lpdwDaclSize),
851 pSacl,
852 ctypes.byref(lpdwSaclSize),
853 pOwner,
854 ctypes.byref(lpdwOwnerSize),
855 pPrimaryGroup,
856 ctypes.byref(lpdwPrimaryGroupSize),
857 )
858 except OSError as e:
859 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
860 raise
861 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)()
862 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)()
863 pDacl = ctypes.cast(pDaclData, PACL).contents
864 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)()
865 pSacl = ctypes.cast(pSaclData, PACL).contents
866 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)()
867 pOwner = ctypes.cast(pOwnerData, PSID)
868 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)()
869 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID)
870 advapi32.MakeAbsoluteSD(
871 pSelfRelativeSecurityDescriptor,
872 pAbsoluteSecurityDescriptor,
873 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
874 pDacl,
875 ctypes.byref(lpdwDaclSize),
876 pSacl,
877 ctypes.byref(lpdwSaclSize),
878 pOwner,
879 lpdwOwnerSize,
880 pPrimaryGroup,
881 ctypes.byref(lpdwPrimaryGroupSize),
882 )
883 return pAbsoluteSecurityDescriptor
885 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor: Any) -> Any:
886 # return a security descriptor in self-relative format
887 # by using a security descriptor in absolute format as a template
888 pSelfRelativeSecurityDescriptor = None
889 lpdwBufferLength = wintypes.DWORD(0)
890 try:
891 advapi32.MakeSelfRelativeSD(
892 pAbsoluteSecurityDescriptor,
893 pSelfRelativeSecurityDescriptor,
894 ctypes.byref(lpdwBufferLength),
895 )
896 except OSError as e:
897 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
898 raise
899 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)()
900 advapi32.MakeSelfRelativeSD(
901 pAbsoluteSecurityDescriptor,
902 pSelfRelativeSecurityDescriptor,
903 ctypes.byref(lpdwBufferLength),
904 )
905 return pSelfRelativeSecurityDescriptor
907 def NewAcl() -> Any:
908 # return a new, initialized ACL (access control list) structure
909 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ?
910 acl_data = ctypes.create_string_buffer(nAclLength)
911 pAcl = ctypes.cast(acl_data, PACL).contents
912 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION)
913 return pAcl
915 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid)
916 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0]
918 Acl = NewAcl()
919 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins)
920 AddAccessAllowedAce(
921 Acl,
922 ACL_REVISION,
923 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE,
924 SidUser,
925 )
927 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION)
928 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD)
929 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0)
930 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD)
932 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD)
935def get_file_mode(fname: str) -> int:
936 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner.
938 Parameters
939 ----------
941 fname : unicode
942 The path to the file to get mode from
944 """
945 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we
946 # should tolerate the execute bit on the file's owner when validating permissions - thus
947 # the missing least significant bit on the third octal digit. In addition, we also tolerate
948 # the sticky bit being set, so the lsb from the fourth octal digit is also removed.
949 return (
950 stat.S_IMODE(Path(fname).stat().st_mode) & 0o6677
951 ) # Use 4 octal digits since S_IMODE does the same
954allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1")
957@contextmanager
958def secure_write(fname: str, binary: bool = False) -> Iterator[Any]:
959 """Opens a file in the most restricted pattern available for
960 writing content. This limits the file mode to `0o0600` and yields
961 the resulting opened filed handle.
963 Parameters
964 ----------
966 fname : unicode
967 The path to the file to write
969 binary: boolean
970 Indicates that the file is binary
971 """
972 mode = "wb" if binary else "w"
973 encoding = None if binary else "utf-8"
974 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
975 try:
976 Path(fname).unlink()
977 except OSError:
978 # Skip any issues with the file not existing
979 pass
981 if os.name == "nt":
982 if allow_insecure_writes:
983 # Mounted file systems can have a number of failure modes inside this block.
984 # For windows machines in insecure mode we simply skip this to avoid failures :/
985 issue_insecure_write_warning()
986 else:
987 # Python on windows does not respect the group and public bits for chmod, so we need
988 # to take additional steps to secure the contents.
989 # Touch file pre-emptively to avoid editing permissions in open files in Windows
990 fd = os.open(fname, open_flag, 0o0600)
991 os.close(fd)
992 open_flag = os.O_WRONLY | os.O_TRUNC
993 win32_restrict_file_to_user(fname)
995 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f:
996 if os.name != "nt":
997 # Enforce that the file got the requested permissions before writing
998 file_mode = get_file_mode(fname)
999 if file_mode != 0o0600:
1000 if allow_insecure_writes:
1001 issue_insecure_write_warning()
1002 else:
1003 msg = (
1004 f"Permissions assignment failed for secure file: '{fname}'."
1005 f" Got '{oct(file_mode)}' instead of '0o0600'."
1006 )
1007 raise RuntimeError(msg)
1008 yield f
1011def issue_insecure_write_warning() -> None:
1012 """Issue an insecure write warning."""
1014 def format_warning(msg: str, *args: Any, **kwargs: Any) -> str: # noqa: ARG001
1015 return str(msg) + "\n"
1017 warnings.formatwarning = format_warning # type:ignore[assignment]
1018 warnings.warn(
1019 "WARNING: Insecure writes have been enabled via environment variable "
1020 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the "
1021 "variable or set its value to 'False'.",
1022 stacklevel=2,
1023 )