Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_core/paths.py: 12%
442 statements
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
1"""Path utility functions."""
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
6# Derived from IPython.utils.path, which is
7# Copyright (c) IPython Development Team.
8# Distributed under the terms of the Modified BSD License.
11import errno
12import os
13import site
14import stat
15import sys
16import tempfile
17import warnings
18from contextlib import contextmanager
19from pathlib import Path
20from typing import Any, Dict, Iterator, List, Optional
22import platformdirs
24from .utils import deprecation
26pjoin = os.path.join
28# Capitalize Jupyter in paths only on Windows and MacOS (when not in Homebrew)
29if sys.platform == "win32" or (
30 sys.platform == "darwin" and not sys.prefix.startswith("/opt/homebrew")
31):
32 APPNAME = "Jupyter"
33else:
34 APPNAME = "jupyter"
36# UF_HIDDEN is a stat flag not defined in the stat module.
37# It is used by BSD to indicate hidden files.
38UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768)
41def envset(name: str, default: Optional[bool] = False) -> Optional[bool]:
42 """Return the boolean value of a given environment variable.
44 An environment variable is considered set if it is assigned to a value
45 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive)
47 If the environment variable is not defined, the default value is returned.
48 """
49 if name not in os.environ:
50 return default
52 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"]
55def use_platform_dirs() -> bool:
56 """Determine if platformdirs should be used for system-specific paths.
58 We plan for this to default to False in jupyter_core version 5 and to True
59 in jupyter_core version 6.
60 """
61 return envset("JUPYTER_PLATFORM_DIRS", False) # type:ignore[return-value]
64def get_home_dir() -> str:
65 """Get the real path of the home directory"""
66 homedir = os.path.expanduser("~")
67 # Next line will make things work even when /home/ is a symlink to
68 # /usr/home as it is on FreeBSD, for example
69 homedir = str(Path(homedir).resolve())
70 return homedir
73_dtemps: Dict[str, str] = {}
76def _do_i_own(path: str) -> bool:
77 """Return whether the current user owns the given path"""
78 p = Path(path).resolve()
80 # walk up to first existing parent
81 while not p.exists() and p != p.parent:
82 p = p.parent
84 # simplest check: owner by name
85 # not always implemented or available
86 try:
87 return p.owner() == os.getlogin()
88 except Exception: # noqa
89 pass
91 if hasattr(os, "geteuid"):
92 try:
93 st = p.stat()
94 return st.st_uid == os.geteuid()
95 except (NotImplementedError, OSError):
96 # geteuid not always implemented
97 pass
99 # no ownership checks worked, check write access
100 return os.access(p, os.W_OK)
103def prefer_environment_over_user() -> bool:
104 """Determine if environment-level paths should take precedence over user-level paths."""
105 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value
106 if "JUPYTER_PREFER_ENV_PATH" in os.environ:
107 return envset("JUPYTER_PREFER_ENV_PATH") # type:ignore[return-value]
109 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def)
110 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix):
111 return True
113 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True
114 if (
115 "CONDA_PREFIX" in os.environ
116 and sys.prefix.startswith(os.environ["CONDA_PREFIX"])
117 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base"
118 and _do_i_own(sys.prefix)
119 ):
120 return True
122 return False
125def _mkdtemp_once(name: str) -> str:
126 """Make or reuse a temporary directory.
128 If this is called with the same name in the same process, it will return
129 the same directory.
130 """
131 try:
132 return _dtemps[name]
133 except KeyError:
134 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-")
135 return d
138def jupyter_config_dir() -> str:
139 """Get the Jupyter config directory for this platform and user.
141 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate
142 directory for the platform.
143 """
145 env = os.environ
146 if env.get("JUPYTER_NO_CONFIG"):
147 return _mkdtemp_once("jupyter-clean-cfg")
149 if env.get("JUPYTER_CONFIG_DIR"):
150 return env["JUPYTER_CONFIG_DIR"]
152 if use_platform_dirs():
153 return platformdirs.user_config_dir(APPNAME, appauthor=False)
155 home_dir = get_home_dir()
156 return pjoin(home_dir, ".jupyter")
159def jupyter_data_dir() -> str:
160 """Get the config directory for Jupyter data files for this platform and user.
162 These are non-transient, non-configuration files.
164 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path.
165 """
166 env = os.environ
168 if env.get("JUPYTER_DATA_DIR"):
169 return env["JUPYTER_DATA_DIR"]
171 if use_platform_dirs():
172 return platformdirs.user_data_dir(APPNAME, appauthor=False)
174 home = get_home_dir()
176 if sys.platform == "darwin":
177 return os.path.join(home, "Library", "Jupyter")
178 elif os.name == "nt":
179 appdata = os.environ.get("APPDATA", None)
180 if appdata:
181 return str(Path(appdata, "jupyter").resolve())
182 else:
183 return pjoin(jupyter_config_dir(), "data")
184 else:
185 # Linux, non-OS X Unix, AIX, etc.
186 xdg = env.get("XDG_DATA_HOME", None)
187 if not xdg:
188 xdg = pjoin(home, ".local", "share")
189 return pjoin(xdg, "jupyter")
192def jupyter_runtime_dir() -> str:
193 """Return the runtime dir for transient jupyter files.
195 Returns JUPYTER_RUNTIME_DIR if defined.
197 The default is now (data_dir)/runtime on all platforms;
198 we no longer use XDG_RUNTIME_DIR after various problems.
199 """
200 env = os.environ
202 if env.get("JUPYTER_RUNTIME_DIR"):
203 return env["JUPYTER_RUNTIME_DIR"]
205 return pjoin(jupyter_data_dir(), "runtime")
208if use_platform_dirs():
209 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir(
210 APPNAME, appauthor=False, multipath=True
211 ).split(os.pathsep)
212else:
213 deprecation(
214 "Jupyter is migrating its paths to use standard platformdirs\n"
215 "given by the platformdirs library. To remove this warning and\n"
216 "see the appropriate new directories, set the environment variable\n"
217 "`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.\n"
218 "The use of platformdirs will be the default in `jupyter_core` v6"
219 )
220 if os.name == "nt":
221 programdata = os.environ.get("PROGRAMDATA", None)
222 if programdata:
223 SYSTEM_JUPYTER_PATH = [pjoin(programdata, "jupyter")]
224 else: # PROGRAMDATA is not defined by default on XP.
225 SYSTEM_JUPYTER_PATH = [os.path.join(sys.prefix, "share", "jupyter")]
226 else:
227 SYSTEM_JUPYTER_PATH = [
228 "/usr/local/share/jupyter",
229 "/usr/share/jupyter",
230 ]
232ENV_JUPYTER_PATH: List[str] = [os.path.join(sys.prefix, "share", "jupyter")]
235def jupyter_path(*subdirs: str) -> List[str]:
236 """Return a list of directories to search for data files
238 JUPYTER_PATH environment variable has highest priority.
240 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level
241 directories will have priority over user-level directories.
243 If the Python site.ENABLE_USER_SITE variable is True, we also add the
244 appropriate Python user site subdirectory to the user-level directories.
247 If ``*subdirs`` are given, that subdirectory will be added to each element.
249 Examples:
251 >>> jupyter_path()
252 ['~/.local/jupyter', '/usr/local/share/jupyter']
253 >>> jupyter_path('kernels')
254 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels']
255 """
257 paths: List[str] = []
259 # highest priority is explicit environment variable
260 if os.environ.get("JUPYTER_PATH"):
261 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep))
263 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
264 user = [jupyter_data_dir()]
265 if site.ENABLE_USER_SITE:
266 # Check if site.getuserbase() exists to be compatible with virtualenv,
267 # which often does not have this method.
268 userbase: Optional[str]
269 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
271 if userbase:
272 userdir = os.path.join(userbase, "share", "jupyter")
273 if userdir not in user:
274 user.append(userdir)
276 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH]
278 if prefer_environment_over_user():
279 paths.extend(env)
280 paths.extend(user)
281 else:
282 paths.extend(user)
283 paths.extend(env)
285 # finally, system
286 paths.extend(SYSTEM_JUPYTER_PATH)
288 # add subdir, if requested
289 if subdirs:
290 paths = [pjoin(p, *subdirs) for p in paths]
291 return paths
294if use_platform_dirs():
295 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir(
296 APPNAME, appauthor=False, multipath=True
297 ).split(os.pathsep)
298else: # noqa: PLR5501
299 if os.name == "nt":
300 programdata = os.environ.get("PROGRAMDATA", None)
301 if programdata: # noqa
302 SYSTEM_CONFIG_PATH = [os.path.join(programdata, "jupyter")]
303 else: # PROGRAMDATA is not defined by default on XP.
304 SYSTEM_CONFIG_PATH = []
305 else:
306 SYSTEM_CONFIG_PATH = [
307 "/usr/local/etc/jupyter",
308 "/etc/jupyter",
309 ]
310ENV_CONFIG_PATH: List[str] = [os.path.join(sys.prefix, "etc", "jupyter")]
313def jupyter_config_path() -> List[str]:
314 """Return the search path for Jupyter config files as a list.
316 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the
317 environment-level directories will have priority over user-level
318 directories.
320 If the Python site.ENABLE_USER_SITE variable is True, we also add the
321 appropriate Python user site subdirectory to the user-level directories.
322 """
323 if os.environ.get("JUPYTER_NO_CONFIG"):
324 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set.
325 return [jupyter_config_dir()]
327 paths: List[str] = []
329 # highest priority is explicit environment variable
330 if os.environ.get("JUPYTER_CONFIG_PATH"):
331 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep))
333 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
334 user = [jupyter_config_dir()]
335 if site.ENABLE_USER_SITE:
336 userbase: Optional[str]
337 # Check if site.getuserbase() exists to be compatible with virtualenv,
338 # which often does not have this method.
339 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
341 if userbase:
342 userdir = os.path.join(userbase, "etc", "jupyter")
343 if userdir not in user:
344 user.append(userdir)
346 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH]
348 if prefer_environment_over_user():
349 paths.extend(env)
350 paths.extend(user)
351 else:
352 paths.extend(user)
353 paths.extend(env)
355 # Finally, system path
356 paths.extend(SYSTEM_CONFIG_PATH)
357 return paths
360def exists(path: str) -> bool:
361 """Replacement for `os.path.exists` which works for host mapped volumes
362 on Windows containers
363 """
364 try:
365 os.lstat(path)
366 except OSError:
367 return False
368 return True
371def is_file_hidden_win(abs_path: str, stat_res: Optional[Any] = None) -> bool:
372 """Is a file hidden?
374 This only checks the file itself; it should be called in combination with
375 checking the directory containing the file.
377 Use is_hidden() instead to check the file and its parent directories.
379 Parameters
380 ----------
381 abs_path : unicode
382 The absolute path to check.
383 stat_res : os.stat_result, optional
384 The result of calling stat() on abs_path. If not passed, this function
385 will call stat() internally.
386 """
387 if os.path.basename(abs_path).startswith("."):
388 return True
390 if stat_res is None:
391 try:
392 stat_res = os.stat(abs_path)
393 except OSError as e:
394 if e.errno == errno.ENOENT:
395 return False
396 raise
398 try:
399 if (
400 stat_res.st_file_attributes # type:ignore[union-attr]
401 & stat.FILE_ATTRIBUTE_HIDDEN # type:ignore[attr-defined]
402 ):
403 return True
404 except AttributeError:
405 # allow AttributeError on PyPy for Windows
406 # 'stat_result' object has no attribute 'st_file_attributes'
407 # https://foss.heptapod.net/pypy/pypy/-/issues/3469
408 warnings.warn(
409 "hidden files are not detectable on this system, so no file will be marked as hidden.",
410 stacklevel=2,
411 )
412 pass
414 return False
417def is_file_hidden_posix(abs_path: str, stat_res: Optional[Any] = None) -> bool:
418 """Is a file hidden?
420 This only checks the file itself; it should be called in combination with
421 checking the directory containing the file.
423 Use is_hidden() instead to check the file and its parent directories.
425 Parameters
426 ----------
427 abs_path : unicode
428 The absolute path to check.
429 stat_res : os.stat_result, optional
430 The result of calling stat() on abs_path. If not passed, this function
431 will call stat() internally.
432 """
433 if os.path.basename(abs_path).startswith("."):
434 return True
436 if stat_res is None or stat.S_ISLNK(stat_res.st_mode):
437 try:
438 stat_res = os.stat(abs_path)
439 except OSError as e:
440 if e.errno == errno.ENOENT:
441 return False
442 raise
444 # check that dirs can be listed
445 if stat.S_ISDIR(stat_res.st_mode): # type:ignore[misc] # noqa
446 # use x-access, not actual listing, in case of slow/large listings
447 if not os.access(abs_path, os.X_OK | os.R_OK):
448 return True
450 # check UF_HIDDEN
451 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN:
452 return True
454 return False
457if sys.platform == "win32":
458 is_file_hidden = is_file_hidden_win
459else:
460 is_file_hidden = is_file_hidden_posix
463def is_hidden(abs_path: str, abs_root: str = "") -> bool:
464 """Is a file hidden or contained in a hidden directory?
466 This will start with the rightmost path element and work backwards to the
467 given root to see if a path is hidden or in a hidden directory. Hidden is
468 determined by either name starting with '.' or the UF_HIDDEN flag as
469 reported by stat.
471 If abs_path is the same directory as abs_root, it will be visible even if
472 that is a hidden folder. This only checks the visibility of files
473 and directories *within* abs_root.
475 Parameters
476 ----------
477 abs_path : unicode
478 The absolute path to check for hidden directories.
479 abs_root : unicode
480 The absolute path of the root directory in which hidden directories
481 should be checked for.
482 """
483 abs_path = os.path.normpath(abs_path)
484 abs_root = os.path.normpath(abs_root)
486 if abs_path == abs_root:
487 return False
489 if is_file_hidden(abs_path):
490 return True
492 if not abs_root:
493 abs_root = abs_path.split(os.sep, 1)[0] + os.sep
494 inside_root = abs_path[len(abs_root) :]
495 if any(part.startswith(".") for part in inside_root.split(os.sep)):
496 return True
498 # check UF_HIDDEN on any location up to root.
499 # is_file_hidden() already checked the file, so start from its parent dir
500 path = os.path.dirname(abs_path)
501 while path and path.startswith(abs_root) and path != abs_root:
502 if not exists(path):
503 path = os.path.dirname(path)
504 continue
505 try:
506 # may fail on Windows junctions
507 st = os.lstat(path)
508 except OSError:
509 return True
510 if getattr(st, "st_flags", 0) & UF_HIDDEN:
511 return True
512 path = os.path.dirname(path)
514 return False
517def win32_restrict_file_to_user(fname: str) -> None:
518 """Secure a windows file to read-only access for the user.
519 Follows guidance from win32 library creator:
520 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
522 This method should be executed against an already generated file which
523 has no secrets written to it yet.
525 Parameters
526 ----------
528 fname : unicode
529 The path to the file to secure
530 """
531 try:
532 import win32api
533 except ImportError:
534 return _win32_restrict_file_to_user_ctypes(fname)
536 import ntsecuritycon as con
537 import win32security
539 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone")
540 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid)
541 user, _domain, _type = win32security.LookupAccountName(
542 "", win32api.GetUserNameEx(win32api.NameSamCompatible)
543 )
545 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION)
547 dacl = win32security.ACL()
548 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone)
549 dacl.AddAccessAllowedAce(
550 win32security.ACL_REVISION,
551 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE,
552 user,
553 )
554 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins)
556 sd.SetSecurityDescriptorDacl(1, dacl, 0)
557 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd)
560def _win32_restrict_file_to_user_ctypes(fname: str) -> None: # noqa
561 """Secure a windows file to read-only access for the user.
563 Follows guidance from win32 library creator:
564 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
566 This method should be executed against an already generated file which
567 has no secrets written to it yet.
569 Parameters
570 ----------
572 fname : unicode
573 The path to the file to secure
574 """
575 import ctypes
576 from ctypes import wintypes
578 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined]
579 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined]
581 NameSamCompatible = 2
582 WinBuiltinAdministratorsSid = 26
583 DACL_SECURITY_INFORMATION = 4
584 ACL_REVISION = 2
585 ERROR_INSUFFICIENT_BUFFER = 122
586 ERROR_MORE_DATA = 234
588 SYNCHRONIZE = 0x100000
589 DELETE = 0x00010000
590 STANDARD_RIGHTS_REQUIRED = 0xF0000
591 STANDARD_RIGHTS_READ = 0x20000
592 STANDARD_RIGHTS_WRITE = 0x20000
593 FILE_READ_DATA = 1
594 FILE_READ_EA = 8
595 FILE_READ_ATTRIBUTES = 128
596 FILE_WRITE_DATA = 2
597 FILE_APPEND_DATA = 4
598 FILE_WRITE_EA = 16
599 FILE_WRITE_ATTRIBUTES = 256
600 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF
601 FILE_GENERIC_READ = (
602 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE
603 )
604 FILE_GENERIC_WRITE = (
605 STANDARD_RIGHTS_WRITE
606 | FILE_WRITE_DATA
607 | FILE_WRITE_ATTRIBUTES
608 | FILE_WRITE_EA
609 | FILE_APPEND_DATA
610 | SYNCHRONIZE
611 )
613 class ACL(ctypes.Structure):
614 _fields_ = [ # noqa
615 ("AclRevision", wintypes.BYTE),
616 ("Sbz1", wintypes.BYTE),
617 ("AclSize", wintypes.WORD),
618 ("AceCount", wintypes.WORD),
619 ("Sbz2", wintypes.WORD),
620 ]
622 PSID = ctypes.c_void_p
623 PACL = ctypes.POINTER(ACL)
624 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE)
626 def _nonzero_success(result: int, func: Any, args: Any) -> Any:
627 if not result:
628 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined]
629 return args
631 secur32.GetUserNameExW.errcheck = _nonzero_success
632 secur32.GetUserNameExW.restype = wintypes.BOOL
633 secur32.GetUserNameExW.argtypes = (
634 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat
635 wintypes.LPWSTR, # LPWSTR lpNameBuffer,
636 wintypes.PULONG, # PULONG nSize
637 )
639 advapi32.CreateWellKnownSid.errcheck = _nonzero_success
640 advapi32.CreateWellKnownSid.restype = wintypes.BOOL
641 advapi32.CreateWellKnownSid.argtypes = (
642 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType
643 PSID, # PSID DomainSid
644 PSID, # PSID pSid
645 wintypes.PDWORD, # DWORD *cbSid
646 )
648 advapi32.LookupAccountNameW.errcheck = _nonzero_success
649 advapi32.LookupAccountNameW.restype = wintypes.BOOL
650 advapi32.LookupAccountNameW.argtypes = (
651 wintypes.LPWSTR, # LPCWSTR lpSystemName
652 wintypes.LPWSTR, # LPCWSTR lpAccountName
653 PSID, # PSID Sid
654 wintypes.LPDWORD, # LPDWORD cbSid
655 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName
656 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName
657 wintypes.LPDWORD, # PSID_NAME_USE peUse
658 )
660 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success
661 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL
662 advapi32.AddAccessAllowedAce.argtypes = (
663 PACL, # PACL pAcl
664 wintypes.DWORD, # DWORD dwAceRevision
665 wintypes.DWORD, # DWORD AccessMask
666 PSID, # PSID pSid
667 )
669 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success
670 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL
671 advapi32.SetSecurityDescriptorDacl.argtypes = (
672 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
673 wintypes.BOOL, # BOOL bDaclPresent
674 PACL, # PACL pDacl
675 wintypes.BOOL, # BOOL bDaclDefaulted
676 )
678 advapi32.GetFileSecurityW.errcheck = _nonzero_success
679 advapi32.GetFileSecurityW.restype = wintypes.BOOL
680 advapi32.GetFileSecurityW.argtypes = (
681 wintypes.LPCWSTR, # LPCWSTR lpFileName
682 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation
683 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
684 wintypes.DWORD, # DWORD nLength
685 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded
686 )
688 advapi32.SetFileSecurityW.errcheck = _nonzero_success
689 advapi32.SetFileSecurityW.restype = wintypes.BOOL
690 advapi32.SetFileSecurityW.argtypes = (
691 wintypes.LPCWSTR, # LPCWSTR lpFileName
692 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation
693 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
694 )
696 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success
697 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL
698 advapi32.MakeAbsoluteSD.argtypes = (
699 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
700 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
701 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize
702 PACL, # PACL pDacl
703 wintypes.LPDWORD, # LPDWORD lpdwDaclSize
704 PACL, # PACL pSacl
705 wintypes.LPDWORD, # LPDWORD lpdwSaclSize
706 PSID, # PSID pOwner
707 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize
708 PSID, # PSID pPrimaryGroup
709 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize
710 )
712 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success
713 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL
714 advapi32.MakeSelfRelativeSD.argtypes = (
715 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
716 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
717 wintypes.LPDWORD, # LPDWORD lpdwBufferLength
718 )
720 advapi32.InitializeAcl.errcheck = _nonzero_success
721 advapi32.InitializeAcl.restype = wintypes.BOOL
722 advapi32.InitializeAcl.argtypes = (
723 PACL, # PACL pAcl,
724 wintypes.DWORD, # DWORD nAclLength,
725 wintypes.DWORD, # DWORD dwAclRevision
726 )
728 def CreateWellKnownSid(WellKnownSidType: Any) -> Any:
729 # return a SID for predefined aliases
730 pSid = (ctypes.c_char * 1)()
731 cbSid = wintypes.DWORD()
732 try:
733 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
734 except OSError as e:
735 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
736 raise
737 pSid = (ctypes.c_char * cbSid.value)()
738 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
739 return pSid[:]
741 def GetUserNameEx(NameFormat: Any) -> Any:
742 # return the user or other security principal associated with
743 # the calling thread
744 nSize = ctypes.pointer(ctypes.c_ulong(0))
745 try:
746 secur32.GetUserNameExW(NameFormat, None, nSize)
747 except OSError as e:
748 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined]
749 raise
750 if not nSize.contents.value:
751 return None
752 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value)
753 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize)
754 return lpNameBuffer.value
756 def LookupAccountName(lpSystemName: Any, lpAccountName: Any) -> Any:
757 # return a security identifier (SID) for an account on a system
758 # and the name of the domain on which the account was found
759 cbSid = wintypes.DWORD(0)
760 cchReferencedDomainName = wintypes.DWORD(0)
761 peUse = wintypes.DWORD(0)
762 try:
763 advapi32.LookupAccountNameW(
764 lpSystemName,
765 lpAccountName,
766 None,
767 ctypes.byref(cbSid),
768 None,
769 ctypes.byref(cchReferencedDomainName),
770 ctypes.byref(peUse),
771 )
772 except OSError as e:
773 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
774 raise
775 Sid = ctypes.create_unicode_buffer("", cbSid.value)
776 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID)
777 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1)
778 success = advapi32.LookupAccountNameW(
779 lpSystemName,
780 lpAccountName,
781 pSid,
782 ctypes.byref(cbSid),
783 lpReferencedDomainName,
784 ctypes.byref(cchReferencedDomainName),
785 ctypes.byref(peUse),
786 )
787 if not success:
788 raise ctypes.WinError() # type:ignore[attr-defined]
789 return pSid, lpReferencedDomainName.value, peUse.value
791 def AddAccessAllowedAce(pAcl: Any, dwAceRevision: Any, AccessMask: Any, pSid: Any) -> Any:
792 # add an access-allowed access control entry (ACE)
793 # to an access control list (ACL)
794 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid)
796 def GetFileSecurity(lpFileName: Any, RequestedInformation: Any) -> Any:
797 # return information about the security of a file or directory
798 nLength = wintypes.DWORD(0)
799 try:
800 advapi32.GetFileSecurityW(
801 lpFileName,
802 RequestedInformation,
803 None,
804 0,
805 ctypes.byref(nLength),
806 )
807 except OSError as e:
808 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
809 raise
810 if not nLength.value:
811 return None
812 pSecurityDescriptor = (wintypes.BYTE * nLength.value)()
813 advapi32.GetFileSecurityW(
814 lpFileName,
815 RequestedInformation,
816 pSecurityDescriptor,
817 nLength,
818 ctypes.byref(nLength),
819 )
820 return pSecurityDescriptor
822 def SetFileSecurity(
823 lpFileName: Any, RequestedInformation: Any, pSecurityDescriptor: Any
824 ) -> Any:
825 # set the security of a file or directory object
826 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor)
828 def SetSecurityDescriptorDacl(
829 pSecurityDescriptor: Any, bDaclPresent: Any, pDacl: Any, bDaclDefaulted: Any
830 ) -> Any:
831 # set information in a discretionary access control list (DACL)
832 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted)
834 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor: Any) -> Any:
835 # return a security descriptor in absolute format
836 # by using a security descriptor in self-relative format as a template
837 pAbsoluteSecurityDescriptor = None
838 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0)
839 pDacl = None
840 lpdwDaclSize = wintypes.DWORD(0)
841 pSacl = None
842 lpdwSaclSize = wintypes.DWORD(0)
843 pOwner = None
844 lpdwOwnerSize = wintypes.DWORD(0)
845 pPrimaryGroup = None
846 lpdwPrimaryGroupSize = wintypes.DWORD(0)
847 try:
848 advapi32.MakeAbsoluteSD(
849 pSelfRelativeSecurityDescriptor,
850 pAbsoluteSecurityDescriptor,
851 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
852 pDacl,
853 ctypes.byref(lpdwDaclSize),
854 pSacl,
855 ctypes.byref(lpdwSaclSize),
856 pOwner,
857 ctypes.byref(lpdwOwnerSize),
858 pPrimaryGroup,
859 ctypes.byref(lpdwPrimaryGroupSize),
860 )
861 except OSError as e:
862 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
863 raise
864 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)()
865 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)()
866 pDacl = ctypes.cast(pDaclData, PACL).contents
867 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)()
868 pSacl = ctypes.cast(pSaclData, PACL).contents
869 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)()
870 pOwner = ctypes.cast(pOwnerData, PSID)
871 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)()
872 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID)
873 advapi32.MakeAbsoluteSD(
874 pSelfRelativeSecurityDescriptor,
875 pAbsoluteSecurityDescriptor,
876 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
877 pDacl,
878 ctypes.byref(lpdwDaclSize),
879 pSacl,
880 ctypes.byref(lpdwSaclSize),
881 pOwner,
882 lpdwOwnerSize,
883 pPrimaryGroup,
884 ctypes.byref(lpdwPrimaryGroupSize),
885 )
886 return pAbsoluteSecurityDescriptor
888 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor: Any) -> Any:
889 # return a security descriptor in self-relative format
890 # by using a security descriptor in absolute format as a template
891 pSelfRelativeSecurityDescriptor = None
892 lpdwBufferLength = wintypes.DWORD(0)
893 try:
894 advapi32.MakeSelfRelativeSD(
895 pAbsoluteSecurityDescriptor,
896 pSelfRelativeSecurityDescriptor,
897 ctypes.byref(lpdwBufferLength),
898 )
899 except OSError as e:
900 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
901 raise
902 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)()
903 advapi32.MakeSelfRelativeSD(
904 pAbsoluteSecurityDescriptor,
905 pSelfRelativeSecurityDescriptor,
906 ctypes.byref(lpdwBufferLength),
907 )
908 return pSelfRelativeSecurityDescriptor
910 def NewAcl() -> Any:
911 # return a new, initialized ACL (access control list) structure
912 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ?
913 acl_data = ctypes.create_string_buffer(nAclLength)
914 pAcl = ctypes.cast(acl_data, PACL).contents
915 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION)
916 return pAcl
918 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid)
919 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0]
921 Acl = NewAcl()
922 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins)
923 AddAccessAllowedAce(
924 Acl,
925 ACL_REVISION,
926 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE,
927 SidUser,
928 )
930 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION)
931 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD)
932 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0)
933 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD)
935 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD)
938def get_file_mode(fname: str) -> int:
939 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner.
941 Parameters
942 ----------
944 fname : unicode
945 The path to the file to get mode from
947 """
948 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we
949 # should tolerate the execute bit on the file's owner when validating permissions - thus
950 # the missing least significant bit on the third octal digit. In addition, we also tolerate
951 # the sticky bit being set, so the lsb from the fourth octal digit is also removed.
952 return (
953 stat.S_IMODE(os.stat(fname).st_mode) & 0o6677
954 ) # Use 4 octal digits since S_IMODE does the same
957allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1")
960@contextmanager
961def secure_write(fname: str, binary: bool = False) -> Iterator[Any]:
962 """Opens a file in the most restricted pattern available for
963 writing content. This limits the file mode to `0o0600` and yields
964 the resulting opened filed handle.
966 Parameters
967 ----------
969 fname : unicode
970 The path to the file to write
972 binary: boolean
973 Indicates that the file is binary
974 """
975 mode = "wb" if binary else "w"
976 encoding = None if binary else "utf-8"
977 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
978 try:
979 os.remove(fname)
980 except OSError:
981 # Skip any issues with the file not existing
982 pass
984 if os.name == "nt":
985 if allow_insecure_writes:
986 # Mounted file systems can have a number of failure modes inside this block.
987 # For windows machines in insecure mode we simply skip this to avoid failures :/
988 issue_insecure_write_warning()
989 else:
990 # Python on windows does not respect the group and public bits for chmod, so we need
991 # to take additional steps to secure the contents.
992 # Touch file pre-emptively to avoid editing permissions in open files in Windows
993 fd = os.open(fname, open_flag, 0o0600)
994 os.close(fd)
995 open_flag = os.O_WRONLY | os.O_TRUNC
996 win32_restrict_file_to_user(fname)
998 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f:
999 if os.name != "nt":
1000 # Enforce that the file got the requested permissions before writing
1001 file_mode = get_file_mode(fname)
1002 if file_mode != 0o0600: # noqa
1003 if allow_insecure_writes:
1004 issue_insecure_write_warning()
1005 else:
1006 msg = (
1007 f"Permissions assignment failed for secure file: '{fname}'."
1008 f" Got '{oct(file_mode)}' instead of '0o0600'."
1009 )
1010 raise RuntimeError(msg)
1011 yield f
1014def issue_insecure_write_warning() -> None:
1015 """Issue an insecure write warning."""
1017 def format_warning(msg: str, *args: Any, **kwargs: Any) -> str:
1018 return str(msg) + "\n"
1020 warnings.formatwarning = format_warning # type:ignore[assignment]
1021 warnings.warn(
1022 "WARNING: Insecure writes have been enabled via environment variable "
1023 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the "
1024 "variable or set its value to 'False'.",
1025 stacklevel=2,
1026 )