Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_core/paths.py: 18%
440 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""Path utility functions."""
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
6# Derived from IPython.utils.path, which is
7# Copyright (c) IPython Development Team.
8# Distributed under the terms of the Modified BSD License.
11import errno
12import os
13import site
14import stat
15import sys
16import tempfile
17import warnings
18from contextlib import contextmanager
19from pathlib import Path
20from typing import Any, Dict, Iterator, List, Optional
22import platformdirs
24from .utils import deprecation
26pjoin = os.path.join
28# Capitalize Jupyter in paths only on Windows and MacOS
29APPNAME = "Jupyter" if sys.platform in ("win32", "darwin") else "jupyter"
31# UF_HIDDEN is a stat flag not defined in the stat module.
32# It is used by BSD to indicate hidden files.
33UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768)
36def envset(name: str, default: Optional[bool] = False) -> Optional[bool]:
37 """Return the boolean value of a given environment variable.
39 An environment variable is considered set if it is assigned to a value
40 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive)
42 If the environment variable is not defined, the default value is returned.
43 """
44 if name not in os.environ:
45 return default
47 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"]
50def use_platform_dirs() -> bool:
51 """Determine if platformdirs should be used for system-specific paths.
53 We plan for this to default to False in jupyter_core version 5 and to True
54 in jupyter_core version 6.
55 """
56 return envset("JUPYTER_PLATFORM_DIRS", False) # type:ignore[return-value]
59def get_home_dir() -> str:
60 """Get the real path of the home directory"""
61 homedir = os.path.expanduser("~")
62 # Next line will make things work even when /home/ is a symlink to
63 # /usr/home as it is on FreeBSD, for example
64 homedir = str(Path(homedir).resolve())
65 return homedir
68_dtemps: Dict[str, str] = {}
71def _do_i_own(path: str) -> bool:
72 """Return whether the current user owns the given path"""
73 p = Path(path).resolve()
75 # walk up to first existing parent
76 while not p.exists() and p != p.parent:
77 p = p.parent
79 # simplest check: owner by name
80 # not always implemented or available
81 try:
82 return p.owner() == os.getlogin()
83 except Exception: # noqa
84 pass
86 if hasattr(os, 'geteuid'):
87 try:
88 st = p.stat()
89 return st.st_uid == os.geteuid()
90 except (NotImplementedError, OSError):
91 # geteuid not always implemented
92 pass
94 # no ownership checks worked, check write access
95 return os.access(p, os.W_OK)
98def prefer_environment_over_user() -> bool:
99 """Determine if environment-level paths should take precedence over user-level paths."""
100 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value
101 if "JUPYTER_PREFER_ENV_PATH" in os.environ:
102 return envset("JUPYTER_PREFER_ENV_PATH") # type:ignore[return-value]
104 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def)
105 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix):
106 return True
108 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True
109 if (
110 "CONDA_PREFIX" in os.environ
111 and sys.prefix.startswith(os.environ["CONDA_PREFIX"])
112 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base"
113 and _do_i_own(sys.prefix)
114 ):
115 return True
117 return False
120def _mkdtemp_once(name: str) -> str:
121 """Make or reuse a temporary directory.
123 If this is called with the same name in the same process, it will return
124 the same directory.
125 """
126 try:
127 return _dtemps[name]
128 except KeyError:
129 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-")
130 return d
133def jupyter_config_dir() -> str:
134 """Get the Jupyter config directory for this platform and user.
136 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate
137 directory for the platform.
138 """
140 env = os.environ
141 if env.get("JUPYTER_NO_CONFIG"):
142 return _mkdtemp_once("jupyter-clean-cfg")
144 if env.get("JUPYTER_CONFIG_DIR"):
145 return env["JUPYTER_CONFIG_DIR"]
147 if use_platform_dirs():
148 return platformdirs.user_config_dir(APPNAME, appauthor=False)
150 home_dir = get_home_dir()
151 return pjoin(home_dir, ".jupyter")
154def jupyter_data_dir() -> str:
155 """Get the config directory for Jupyter data files for this platform and user.
157 These are non-transient, non-configuration files.
159 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path.
160 """
161 env = os.environ
163 if env.get("JUPYTER_DATA_DIR"):
164 return env["JUPYTER_DATA_DIR"]
166 if use_platform_dirs():
167 return platformdirs.user_data_dir(APPNAME, appauthor=False)
169 home = get_home_dir()
171 if sys.platform == "darwin":
172 return os.path.join(home, "Library", "Jupyter")
173 elif os.name == "nt":
174 appdata = os.environ.get("APPDATA", None)
175 if appdata:
176 return str(Path(appdata, "jupyter").resolve())
177 else:
178 return pjoin(jupyter_config_dir(), "data")
179 else:
180 # Linux, non-OS X Unix, AIX, etc.
181 xdg = env.get("XDG_DATA_HOME", None)
182 if not xdg:
183 xdg = pjoin(home, ".local", "share")
184 return pjoin(xdg, "jupyter")
187def jupyter_runtime_dir() -> str:
188 """Return the runtime dir for transient jupyter files.
190 Returns JUPYTER_RUNTIME_DIR if defined.
192 The default is now (data_dir)/runtime on all platforms;
193 we no longer use XDG_RUNTIME_DIR after various problems.
194 """
195 env = os.environ
197 if env.get("JUPYTER_RUNTIME_DIR"):
198 return env["JUPYTER_RUNTIME_DIR"]
200 return pjoin(jupyter_data_dir(), "runtime")
203if use_platform_dirs():
204 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir(
205 APPNAME, appauthor=False, multipath=True
206 ).split(os.pathsep)
207else:
208 deprecation(
209 "Jupyter is migrating its paths to use standard platformdirs\n"
210 "given by the platformdirs library. To remove this warning and\n"
211 "see the appropriate new directories, set the environment variable\n"
212 "`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.\n"
213 "The use of platformdirs will be the default in `jupyter_core` v6"
214 )
215 if os.name == "nt":
216 programdata = os.environ.get("PROGRAMDATA", None)
217 if programdata:
218 SYSTEM_JUPYTER_PATH = [pjoin(programdata, "jupyter")]
219 else: # PROGRAMDATA is not defined by default on XP.
220 SYSTEM_JUPYTER_PATH = [os.path.join(sys.prefix, "share", "jupyter")]
221 else:
222 SYSTEM_JUPYTER_PATH = [
223 "/usr/local/share/jupyter",
224 "/usr/share/jupyter",
225 ]
227ENV_JUPYTER_PATH: List[str] = [os.path.join(sys.prefix, "share", "jupyter")]
230def jupyter_path(*subdirs: str) -> List[str]:
231 """Return a list of directories to search for data files
233 JUPYTER_PATH environment variable has highest priority.
235 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level
236 directories will have priority over user-level directories.
238 If the Python site.ENABLE_USER_SITE variable is True, we also add the
239 appropriate Python user site subdirectory to the user-level directories.
242 If ``*subdirs`` are given, that subdirectory will be added to each element.
244 Examples:
246 >>> jupyter_path()
247 ['~/.local/jupyter', '/usr/local/share/jupyter']
248 >>> jupyter_path('kernels')
249 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels']
250 """
252 paths: List[str] = []
254 # highest priority is explicit environment variable
255 if os.environ.get("JUPYTER_PATH"):
256 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep))
258 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
259 user = [jupyter_data_dir()]
260 if site.ENABLE_USER_SITE:
261 # Check if site.getuserbase() exists to be compatible with virtualenv,
262 # which often does not have this method.
263 userbase: Optional[str]
264 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
266 if userbase:
267 userdir = os.path.join(userbase, "share", "jupyter")
268 if userdir not in user:
269 user.append(userdir)
271 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH]
273 if prefer_environment_over_user():
274 paths.extend(env)
275 paths.extend(user)
276 else:
277 paths.extend(user)
278 paths.extend(env)
280 # finally, system
281 paths.extend(SYSTEM_JUPYTER_PATH)
283 # add subdir, if requested
284 if subdirs:
285 paths = [pjoin(p, *subdirs) for p in paths]
286 return paths
289if use_platform_dirs():
290 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir(
291 APPNAME, appauthor=False, multipath=True
292 ).split(os.pathsep)
293else:
294 if os.name == "nt": # noqa
295 programdata = os.environ.get("PROGRAMDATA", None)
296 if programdata: # noqa
297 SYSTEM_CONFIG_PATH = [os.path.join(programdata, "jupyter")]
298 else: # PROGRAMDATA is not defined by default on XP.
299 SYSTEM_CONFIG_PATH = []
300 else:
301 SYSTEM_CONFIG_PATH = [
302 "/usr/local/etc/jupyter",
303 "/etc/jupyter",
304 ]
305ENV_CONFIG_PATH: List[str] = [os.path.join(sys.prefix, "etc", "jupyter")]
308def jupyter_config_path() -> List[str]:
309 """Return the search path for Jupyter config files as a list.
311 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the
312 environment-level directories will have priority over user-level
313 directories.
315 If the Python site.ENABLE_USER_SITE variable is True, we also add the
316 appropriate Python user site subdirectory to the user-level directories.
317 """
318 if os.environ.get("JUPYTER_NO_CONFIG"):
319 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set.
320 return [jupyter_config_dir()]
322 paths: List[str] = []
324 # highest priority is explicit environment variable
325 if os.environ.get("JUPYTER_CONFIG_PATH"):
326 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep))
328 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
329 user = [jupyter_config_dir()]
330 if site.ENABLE_USER_SITE:
331 userbase: Optional[str]
332 # Check if site.getuserbase() exists to be compatible with virtualenv,
333 # which often does not have this method.
334 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
336 if userbase:
337 userdir = os.path.join(userbase, "etc", "jupyter")
338 if userdir not in user:
339 user.append(userdir)
341 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH]
343 if prefer_environment_over_user():
344 paths.extend(env)
345 paths.extend(user)
346 else:
347 paths.extend(user)
348 paths.extend(env)
350 # Finally, system path
351 paths.extend(SYSTEM_CONFIG_PATH)
352 return paths
355def exists(path: str) -> bool:
356 """Replacement for `os.path.exists` which works for host mapped volumes
357 on Windows containers
358 """
359 try:
360 os.lstat(path)
361 except OSError:
362 return False
363 return True
366def is_file_hidden_win(abs_path: str, stat_res: Optional[Any] = None) -> bool:
367 """Is a file hidden?
369 This only checks the file itself; it should be called in combination with
370 checking the directory containing the file.
372 Use is_hidden() instead to check the file and its parent directories.
374 Parameters
375 ----------
376 abs_path : unicode
377 The absolute path to check.
378 stat_res : os.stat_result, optional
379 The result of calling stat() on abs_path. If not passed, this function
380 will call stat() internally.
381 """
382 if os.path.basename(abs_path).startswith("."):
383 return True
385 if stat_res is None:
386 try:
387 stat_res = os.stat(abs_path)
388 except OSError as e:
389 if e.errno == errno.ENOENT:
390 return False
391 raise
393 try:
394 if stat_res.st_file_attributes & stat.FILE_ATTRIBUTE_HIDDEN: # type:ignore
395 return True
396 except AttributeError:
397 # allow AttributeError on PyPy for Windows
398 # 'stat_result' object has no attribute 'st_file_attributes'
399 # https://foss.heptapod.net/pypy/pypy/-/issues/3469
400 warnings.warn(
401 "hidden files are not detectable on this system, so no file will be marked as hidden.",
402 stacklevel=2,
403 )
404 pass
406 return False
409def is_file_hidden_posix(abs_path: str, stat_res: Optional[Any] = None) -> bool:
410 """Is a file hidden?
412 This only checks the file itself; it should be called in combination with
413 checking the directory containing the file.
415 Use is_hidden() instead to check the file and its parent directories.
417 Parameters
418 ----------
419 abs_path : unicode
420 The absolute path to check.
421 stat_res : os.stat_result, optional
422 The result of calling stat() on abs_path. If not passed, this function
423 will call stat() internally.
424 """
425 if os.path.basename(abs_path).startswith("."):
426 return True
428 if stat_res is None or stat.S_ISLNK(stat_res.st_mode):
429 try:
430 stat_res = os.stat(abs_path)
431 except OSError as e:
432 if e.errno == errno.ENOENT:
433 return False
434 raise
436 # check that dirs can be listed
437 if stat.S_ISDIR(stat_res.st_mode): # type:ignore[misc] # noqa
438 # use x-access, not actual listing, in case of slow/large listings
439 if not os.access(abs_path, os.X_OK | os.R_OK):
440 return True
442 # check UF_HIDDEN
443 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN:
444 return True
446 return False
449if sys.platform == "win32":
450 is_file_hidden = is_file_hidden_win
451else:
452 is_file_hidden = is_file_hidden_posix
455def is_hidden(abs_path: str, abs_root: str = "") -> bool:
456 """Is a file hidden or contained in a hidden directory?
458 This will start with the rightmost path element and work backwards to the
459 given root to see if a path is hidden or in a hidden directory. Hidden is
460 determined by either name starting with '.' or the UF_HIDDEN flag as
461 reported by stat.
463 If abs_path is the same directory as abs_root, it will be visible even if
464 that is a hidden folder. This only checks the visibility of files
465 and directories *within* abs_root.
467 Parameters
468 ----------
469 abs_path : unicode
470 The absolute path to check for hidden directories.
471 abs_root : unicode
472 The absolute path of the root directory in which hidden directories
473 should be checked for.
474 """
475 abs_path = os.path.normpath(abs_path)
476 abs_root = os.path.normpath(abs_root)
478 if abs_path == abs_root:
479 return False
481 if is_file_hidden(abs_path):
482 return True
484 if not abs_root:
485 abs_root = abs_path.split(os.sep, 1)[0] + os.sep
486 inside_root = abs_path[len(abs_root) :]
487 if any(part.startswith(".") for part in inside_root.split(os.sep)):
488 return True
490 # check UF_HIDDEN on any location up to root.
491 # is_file_hidden() already checked the file, so start from its parent dir
492 path = os.path.dirname(abs_path)
493 while path and path.startswith(abs_root) and path != abs_root:
494 if not exists(path):
495 path = os.path.dirname(path)
496 continue
497 try:
498 # may fail on Windows junctions
499 st = os.lstat(path)
500 except OSError:
501 return True
502 if getattr(st, "st_flags", 0) & UF_HIDDEN:
503 return True
504 path = os.path.dirname(path)
506 return False
509def win32_restrict_file_to_user(fname: str) -> None:
510 """Secure a windows file to read-only access for the user.
511 Follows guidance from win32 library creator:
512 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
514 This method should be executed against an already generated file which
515 has no secrets written to it yet.
517 Parameters
518 ----------
520 fname : unicode
521 The path to the file to secure
522 """
523 try:
524 import win32api
525 except ImportError:
526 return _win32_restrict_file_to_user_ctypes(fname)
528 import ntsecuritycon as con
529 import win32security
531 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone")
532 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid)
533 user, _domain, _type = win32security.LookupAccountName(
534 "", win32api.GetUserNameEx(win32api.NameSamCompatible)
535 )
537 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION)
539 dacl = win32security.ACL()
540 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone)
541 dacl.AddAccessAllowedAce(
542 win32security.ACL_REVISION,
543 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE,
544 user,
545 )
546 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins)
548 sd.SetSecurityDescriptorDacl(1, dacl, 0)
549 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd)
552def _win32_restrict_file_to_user_ctypes(fname: str) -> None: # noqa
553 """Secure a windows file to read-only access for the user.
555 Follows guidance from win32 library creator:
556 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
558 This method should be executed against an already generated file which
559 has no secrets written to it yet.
561 Parameters
562 ----------
564 fname : unicode
565 The path to the file to secure
566 """
567 import ctypes
568 from ctypes import wintypes
570 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined]
571 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined]
573 NameSamCompatible = 2
574 WinBuiltinAdministratorsSid = 26
575 DACL_SECURITY_INFORMATION = 4
576 ACL_REVISION = 2
577 ERROR_INSUFFICIENT_BUFFER = 122
578 ERROR_MORE_DATA = 234
580 SYNCHRONIZE = 0x100000
581 DELETE = 0x00010000
582 STANDARD_RIGHTS_REQUIRED = 0xF0000
583 STANDARD_RIGHTS_READ = 0x20000
584 STANDARD_RIGHTS_WRITE = 0x20000
585 FILE_READ_DATA = 1
586 FILE_READ_EA = 8
587 FILE_READ_ATTRIBUTES = 128
588 FILE_WRITE_DATA = 2
589 FILE_APPEND_DATA = 4
590 FILE_WRITE_EA = 16
591 FILE_WRITE_ATTRIBUTES = 256
592 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF
593 FILE_GENERIC_READ = (
594 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE
595 )
596 FILE_GENERIC_WRITE = (
597 STANDARD_RIGHTS_WRITE
598 | FILE_WRITE_DATA
599 | FILE_WRITE_ATTRIBUTES
600 | FILE_WRITE_EA
601 | FILE_APPEND_DATA
602 | SYNCHRONIZE
603 )
605 class ACL(ctypes.Structure):
606 _fields_ = [
607 ("AclRevision", wintypes.BYTE),
608 ("Sbz1", wintypes.BYTE),
609 ("AclSize", wintypes.WORD),
610 ("AceCount", wintypes.WORD),
611 ("Sbz2", wintypes.WORD),
612 ]
614 PSID = ctypes.c_void_p
615 PACL = ctypes.POINTER(ACL)
616 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE)
618 def _nonzero_success(result, func, args):
619 if not result:
620 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined]
621 return args
623 secur32.GetUserNameExW.errcheck = _nonzero_success
624 secur32.GetUserNameExW.restype = wintypes.BOOL
625 secur32.GetUserNameExW.argtypes = (
626 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat
627 wintypes.LPWSTR, # LPWSTR lpNameBuffer,
628 wintypes.PULONG, # PULONG nSize
629 )
631 advapi32.CreateWellKnownSid.errcheck = _nonzero_success
632 advapi32.CreateWellKnownSid.restype = wintypes.BOOL
633 advapi32.CreateWellKnownSid.argtypes = (
634 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType
635 PSID, # PSID DomainSid
636 PSID, # PSID pSid
637 wintypes.PDWORD, # DWORD *cbSid
638 )
640 advapi32.LookupAccountNameW.errcheck = _nonzero_success
641 advapi32.LookupAccountNameW.restype = wintypes.BOOL
642 advapi32.LookupAccountNameW.argtypes = (
643 wintypes.LPWSTR, # LPCWSTR lpSystemName
644 wintypes.LPWSTR, # LPCWSTR lpAccountName
645 PSID, # PSID Sid
646 wintypes.LPDWORD, # LPDWORD cbSid
647 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName
648 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName
649 wintypes.LPDWORD, # PSID_NAME_USE peUse
650 )
652 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success
653 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL
654 advapi32.AddAccessAllowedAce.argtypes = (
655 PACL, # PACL pAcl
656 wintypes.DWORD, # DWORD dwAceRevision
657 wintypes.DWORD, # DWORD AccessMask
658 PSID, # PSID pSid
659 )
661 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success
662 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL
663 advapi32.SetSecurityDescriptorDacl.argtypes = (
664 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
665 wintypes.BOOL, # BOOL bDaclPresent
666 PACL, # PACL pDacl
667 wintypes.BOOL, # BOOL bDaclDefaulted
668 )
670 advapi32.GetFileSecurityW.errcheck = _nonzero_success
671 advapi32.GetFileSecurityW.restype = wintypes.BOOL
672 advapi32.GetFileSecurityW.argtypes = (
673 wintypes.LPCWSTR, # LPCWSTR lpFileName
674 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation
675 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
676 wintypes.DWORD, # DWORD nLength
677 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded
678 )
680 advapi32.SetFileSecurityW.errcheck = _nonzero_success
681 advapi32.SetFileSecurityW.restype = wintypes.BOOL
682 advapi32.SetFileSecurityW.argtypes = (
683 wintypes.LPCWSTR, # LPCWSTR lpFileName
684 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation
685 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
686 )
688 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success
689 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL
690 advapi32.MakeAbsoluteSD.argtypes = (
691 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
692 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
693 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize
694 PACL, # PACL pDacl
695 wintypes.LPDWORD, # LPDWORD lpdwDaclSize
696 PACL, # PACL pSacl
697 wintypes.LPDWORD, # LPDWORD lpdwSaclSize
698 PSID, # PSID pOwner
699 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize
700 PSID, # PSID pPrimaryGroup
701 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize
702 )
704 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success
705 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL
706 advapi32.MakeSelfRelativeSD.argtypes = (
707 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
708 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
709 wintypes.LPDWORD, # LPDWORD lpdwBufferLength
710 )
712 advapi32.InitializeAcl.errcheck = _nonzero_success
713 advapi32.InitializeAcl.restype = wintypes.BOOL
714 advapi32.InitializeAcl.argtypes = (
715 PACL, # PACL pAcl,
716 wintypes.DWORD, # DWORD nAclLength,
717 wintypes.DWORD, # DWORD dwAclRevision
718 )
720 def CreateWellKnownSid(WellKnownSidType):
721 # return a SID for predefined aliases
722 pSid = (ctypes.c_char * 1)()
723 cbSid = wintypes.DWORD()
724 try:
725 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
726 except OSError as e:
727 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
728 raise
729 pSid = (ctypes.c_char * cbSid.value)()
730 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
731 return pSid[:]
733 def GetUserNameEx(NameFormat):
734 # return the user or other security principal associated with
735 # the calling thread
736 nSize = ctypes.pointer(ctypes.c_ulong(0))
737 try:
738 secur32.GetUserNameExW(NameFormat, None, nSize)
739 except OSError as e:
740 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined]
741 raise
742 if not nSize.contents.value:
743 return None
744 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value)
745 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize)
746 return lpNameBuffer.value
748 def LookupAccountName(lpSystemName, lpAccountName):
749 # return a security identifier (SID) for an account on a system
750 # and the name of the domain on which the account was found
751 cbSid = wintypes.DWORD(0)
752 cchReferencedDomainName = wintypes.DWORD(0)
753 peUse = wintypes.DWORD(0)
754 try:
755 advapi32.LookupAccountNameW(
756 lpSystemName,
757 lpAccountName,
758 None,
759 ctypes.byref(cbSid),
760 None,
761 ctypes.byref(cchReferencedDomainName),
762 ctypes.byref(peUse),
763 )
764 except OSError as e:
765 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
766 raise
767 Sid = ctypes.create_unicode_buffer("", cbSid.value)
768 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID)
769 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1)
770 success = advapi32.LookupAccountNameW(
771 lpSystemName,
772 lpAccountName,
773 pSid,
774 ctypes.byref(cbSid),
775 lpReferencedDomainName,
776 ctypes.byref(cchReferencedDomainName),
777 ctypes.byref(peUse),
778 )
779 if not success:
780 raise ctypes.WinError() # type:ignore[attr-defined]
781 return pSid, lpReferencedDomainName.value, peUse.value
783 def AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid):
784 # add an access-allowed access control entry (ACE)
785 # to an access control list (ACL)
786 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid)
788 def GetFileSecurity(lpFileName, RequestedInformation):
789 # return information about the security of a file or directory
790 nLength = wintypes.DWORD(0)
791 try:
792 advapi32.GetFileSecurityW(
793 lpFileName,
794 RequestedInformation,
795 None,
796 0,
797 ctypes.byref(nLength),
798 )
799 except OSError as e:
800 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
801 raise
802 if not nLength.value:
803 return None
804 pSecurityDescriptor = (wintypes.BYTE * nLength.value)()
805 advapi32.GetFileSecurityW(
806 lpFileName,
807 RequestedInformation,
808 pSecurityDescriptor,
809 nLength,
810 ctypes.byref(nLength),
811 )
812 return pSecurityDescriptor
814 def SetFileSecurity(lpFileName, RequestedInformation, pSecurityDescriptor):
815 # set the security of a file or directory object
816 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor)
818 def SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted):
819 # set information in a discretionary access control list (DACL)
820 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted)
822 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor):
823 # return a security descriptor in absolute format
824 # by using a security descriptor in self-relative format as a template
825 pAbsoluteSecurityDescriptor = None
826 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0)
827 pDacl = None
828 lpdwDaclSize = wintypes.DWORD(0)
829 pSacl = None
830 lpdwSaclSize = wintypes.DWORD(0)
831 pOwner = None
832 lpdwOwnerSize = wintypes.DWORD(0)
833 pPrimaryGroup = None
834 lpdwPrimaryGroupSize = wintypes.DWORD(0)
835 try:
836 advapi32.MakeAbsoluteSD(
837 pSelfRelativeSecurityDescriptor,
838 pAbsoluteSecurityDescriptor,
839 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
840 pDacl,
841 ctypes.byref(lpdwDaclSize),
842 pSacl,
843 ctypes.byref(lpdwSaclSize),
844 pOwner,
845 ctypes.byref(lpdwOwnerSize),
846 pPrimaryGroup,
847 ctypes.byref(lpdwPrimaryGroupSize),
848 )
849 except OSError as e:
850 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
851 raise
852 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)()
853 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)()
854 pDacl = ctypes.cast(pDaclData, PACL).contents
855 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)()
856 pSacl = ctypes.cast(pSaclData, PACL).contents
857 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)()
858 pOwner = ctypes.cast(pOwnerData, PSID)
859 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)()
860 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID)
861 advapi32.MakeAbsoluteSD(
862 pSelfRelativeSecurityDescriptor,
863 pAbsoluteSecurityDescriptor,
864 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
865 pDacl,
866 ctypes.byref(lpdwDaclSize),
867 pSacl,
868 ctypes.byref(lpdwSaclSize),
869 pOwner,
870 lpdwOwnerSize,
871 pPrimaryGroup,
872 ctypes.byref(lpdwPrimaryGroupSize),
873 )
874 return pAbsoluteSecurityDescriptor
876 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor):
877 # return a security descriptor in self-relative format
878 # by using a security descriptor in absolute format as a template
879 pSelfRelativeSecurityDescriptor = None
880 lpdwBufferLength = wintypes.DWORD(0)
881 try:
882 advapi32.MakeSelfRelativeSD(
883 pAbsoluteSecurityDescriptor,
884 pSelfRelativeSecurityDescriptor,
885 ctypes.byref(lpdwBufferLength),
886 )
887 except OSError as e:
888 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
889 raise
890 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)()
891 advapi32.MakeSelfRelativeSD(
892 pAbsoluteSecurityDescriptor,
893 pSelfRelativeSecurityDescriptor,
894 ctypes.byref(lpdwBufferLength),
895 )
896 return pSelfRelativeSecurityDescriptor
898 def NewAcl():
899 # return a new, initialized ACL (access control list) structure
900 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ?
901 acl_data = ctypes.create_string_buffer(nAclLength)
902 pAcl = ctypes.cast(acl_data, PACL).contents
903 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION)
904 return pAcl
906 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid)
907 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0]
909 Acl = NewAcl()
910 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins)
911 AddAccessAllowedAce(
912 Acl,
913 ACL_REVISION,
914 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE,
915 SidUser,
916 )
918 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION)
919 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD)
920 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0)
921 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD)
923 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD)
926def get_file_mode(fname: str) -> int:
927 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner.
929 Parameters
930 ----------
932 fname : unicode
933 The path to the file to get mode from
935 """
936 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we
937 # should tolerate the execute bit on the file's owner when validating permissions - thus
938 # the missing least significant bit on the third octal digit. In addition, we also tolerate
939 # the sticky bit being set, so the lsb from the fourth octal digit is also removed.
940 return (
941 stat.S_IMODE(os.stat(fname).st_mode) & 0o6677
942 ) # Use 4 octal digits since S_IMODE does the same
945allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1")
948@contextmanager
949def secure_write(fname: str, binary: bool = False) -> Iterator[Any]:
950 """Opens a file in the most restricted pattern available for
951 writing content. This limits the file mode to `0o0600` and yields
952 the resulting opened filed handle.
954 Parameters
955 ----------
957 fname : unicode
958 The path to the file to write
960 binary: boolean
961 Indicates that the file is binary
962 """
963 mode = "wb" if binary else "w"
964 encoding = None if binary else "utf-8"
965 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
966 try:
967 os.remove(fname)
968 except OSError:
969 # Skip any issues with the file not existing
970 pass
972 if os.name == "nt":
973 if allow_insecure_writes:
974 # Mounted file systems can have a number of failure modes inside this block.
975 # For windows machines in insecure mode we simply skip this to avoid failures :/
976 issue_insecure_write_warning()
977 else:
978 # Python on windows does not respect the group and public bits for chmod, so we need
979 # to take additional steps to secure the contents.
980 # Touch file pre-emptively to avoid editing permissions in open files in Windows
981 fd = os.open(fname, open_flag, 0o0600)
982 os.close(fd)
983 open_flag = os.O_WRONLY | os.O_TRUNC
984 win32_restrict_file_to_user(fname)
986 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f:
987 if os.name != "nt":
988 # Enforce that the file got the requested permissions before writing
989 file_mode = get_file_mode(fname)
990 if file_mode != 0o0600: # noqa
991 if allow_insecure_writes:
992 issue_insecure_write_warning()
993 else:
994 msg = (
995 "Permissions assignment failed for secure file: '{file}'."
996 " Got '{permissions}' instead of '0o0600'.".format(
997 file=fname, permissions=oct(file_mode)
998 )
999 )
1000 raise RuntimeError(msg)
1001 yield f
1004def issue_insecure_write_warning() -> None:
1005 """Issue an insecure write warning."""
1007 def format_warning(msg, *args, **kwargs):
1008 return str(msg) + "\n"
1010 warnings.formatwarning = format_warning # type:ignore[assignment]
1011 warnings.warn(
1012 "WARNING: Insecure writes have been enabled via environment variable "
1013 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the "
1014 "variable or set its value to 'False'.",
1015 stacklevel=2,
1016 )