Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_core/paths.py: 18%
440 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 06:10 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-03 06:10 +0000
1"""Path utility functions."""
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
6# Derived from IPython.utils.path, which is
7# Copyright (c) IPython Development Team.
8# Distributed under the terms of the Modified BSD License.
11import errno
12import os
13import site
14import stat
15import sys
16import tempfile
17import warnings
18from contextlib import contextmanager
19from pathlib import Path
20from typing import Any, Dict, Iterator, List, Optional
22import platformdirs
24from .utils import deprecation
26pjoin = os.path.join
28# Capitalize Jupyter in paths only on Windows and MacOS
29APPNAME = "Jupyter" if sys.platform in ("win32", "darwin") else "jupyter"
31# UF_HIDDEN is a stat flag not defined in the stat module.
32# It is used by BSD to indicate hidden files.
33UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768)
36def envset(name: str, default: Optional[bool] = False) -> Optional[bool]:
37 """Return the boolean value of a given environment variable.
39 An environment variable is considered set if it is assigned to a value
40 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive)
42 If the environment variable is not defined, the default value is returned.
43 """
44 if name not in os.environ:
45 return default
47 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"]
50def use_platform_dirs() -> bool:
51 """Determine if platformdirs should be used for system-specific paths.
53 We plan for this to default to False in jupyter_core version 5 and to True
54 in jupyter_core version 6.
55 """
56 return envset("JUPYTER_PLATFORM_DIRS", False) # type:ignore[return-value]
59def get_home_dir() -> str:
60 """Get the real path of the home directory"""
61 homedir = os.path.expanduser("~")
62 # Next line will make things work even when /home/ is a symlink to
63 # /usr/home as it is on FreeBSD, for example
64 homedir = str(Path(homedir).resolve())
65 return homedir
68_dtemps: Dict[str, str] = {}
71def _do_i_own(path: str) -> bool:
72 """Return whether the current user owns the given path"""
73 p = Path(path).resolve()
75 # walk up to first existing parent
76 while not p.exists() and p != p.parent:
77 p = p.parent
79 # simplest check: owner by name
80 # not always implemented or available
81 try:
82 return p.owner() == os.getlogin()
83 except Exception: # noqa
84 pass
86 if hasattr(os, 'geteuid'):
87 try:
88 st = p.stat()
89 return st.st_uid == os.geteuid()
90 except (NotImplementedError, OSError):
91 # geteuid not always implemented
92 pass
94 # no ownership checks worked, check write access
95 return os.access(p, os.W_OK)
98def prefer_environment_over_user() -> bool:
99 """Determine if environment-level paths should take precedence over user-level paths."""
100 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value
101 if "JUPYTER_PREFER_ENV_PATH" in os.environ:
102 return envset("JUPYTER_PREFER_ENV_PATH") # type:ignore[return-value]
104 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def)
105 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix):
106 return True
108 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True
109 if (
110 "CONDA_PREFIX" in os.environ
111 and sys.prefix.startswith(os.environ["CONDA_PREFIX"])
112 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base"
113 and _do_i_own(sys.prefix)
114 ):
115 return True
117 return False
120def _mkdtemp_once(name: str) -> str:
121 """Make or reuse a temporary directory.
123 If this is called with the same name in the same process, it will return
124 the same directory.
125 """
126 try:
127 return _dtemps[name]
128 except KeyError:
129 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-")
130 return d
133def jupyter_config_dir() -> str:
134 """Get the Jupyter config directory for this platform and user.
136 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate
137 directory for the platform.
138 """
140 env = os.environ
141 if env.get("JUPYTER_NO_CONFIG"):
142 return _mkdtemp_once("jupyter-clean-cfg")
144 if env.get("JUPYTER_CONFIG_DIR"):
145 return env["JUPYTER_CONFIG_DIR"]
147 if use_platform_dirs():
148 return platformdirs.user_config_dir(APPNAME, appauthor=False)
150 home_dir = get_home_dir()
151 return pjoin(home_dir, ".jupyter")
154def jupyter_data_dir() -> str:
155 """Get the config directory for Jupyter data files for this platform and user.
157 These are non-transient, non-configuration files.
159 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path.
160 """
161 env = os.environ
163 if env.get("JUPYTER_DATA_DIR"):
164 return env["JUPYTER_DATA_DIR"]
166 if use_platform_dirs():
167 return platformdirs.user_data_dir(APPNAME, appauthor=False)
169 home = get_home_dir()
171 if sys.platform == "darwin":
172 return os.path.join(home, "Library", "Jupyter")
173 elif os.name == "nt":
174 appdata = os.environ.get("APPDATA", None)
175 if appdata:
176 return str(Path(appdata, "jupyter").resolve())
177 else:
178 return pjoin(jupyter_config_dir(), "data")
179 else:
180 # Linux, non-OS X Unix, AIX, etc.
181 xdg = env.get("XDG_DATA_HOME", None)
182 if not xdg:
183 xdg = pjoin(home, ".local", "share")
184 return pjoin(xdg, "jupyter")
187def jupyter_runtime_dir() -> str:
188 """Return the runtime dir for transient jupyter files.
190 Returns JUPYTER_RUNTIME_DIR if defined.
192 The default is now (data_dir)/runtime on all platforms;
193 we no longer use XDG_RUNTIME_DIR after various problems.
194 """
195 env = os.environ
197 if env.get("JUPYTER_RUNTIME_DIR"):
198 return env["JUPYTER_RUNTIME_DIR"]
200 return pjoin(jupyter_data_dir(), "runtime")
203if use_platform_dirs():
204 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir(
205 APPNAME, appauthor=False, multipath=True
206 ).split(os.pathsep)
207else:
208 deprecation(
209 "Jupyter is migrating its paths to use standard platformdirs\n"
210 "given by the platformdirs library. To remove this warning and\n"
211 "see the appropriate new directories, set the environment variable\n"
212 "`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.\n"
213 "The use of platformdirs will be the default in `jupyter_core` v6"
214 )
215 if os.name == "nt":
216 programdata = os.environ.get("PROGRAMDATA", None)
217 if programdata:
218 SYSTEM_JUPYTER_PATH = [pjoin(programdata, "jupyter")]
219 else: # PROGRAMDATA is not defined by default on XP.
220 SYSTEM_JUPYTER_PATH = [os.path.join(sys.prefix, "share", "jupyter")]
221 else:
222 SYSTEM_JUPYTER_PATH = [
223 "/usr/local/share/jupyter",
224 "/usr/share/jupyter",
225 ]
227ENV_JUPYTER_PATH: List[str] = [os.path.join(sys.prefix, "share", "jupyter")]
230def jupyter_path(*subdirs: str) -> List[str]:
231 """Return a list of directories to search for data files
233 JUPYTER_PATH environment variable has highest priority.
235 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level
236 directories will have priority over user-level directories.
238 If the Python site.ENABLE_USER_SITE variable is True, we also add the
239 appropriate Python user site subdirectory to the user-level directories.
242 If ``*subdirs`` are given, that subdirectory will be added to each element.
244 Examples:
246 >>> jupyter_path()
247 ['~/.local/jupyter', '/usr/local/share/jupyter']
248 >>> jupyter_path('kernels')
249 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels']
250 """
252 paths: List[str] = []
254 # highest priority is explicit environment variable
255 if os.environ.get("JUPYTER_PATH"):
256 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep))
258 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
259 user = [jupyter_data_dir()]
260 if site.ENABLE_USER_SITE:
261 # Check if site.getuserbase() exists to be compatible with virtualenv,
262 # which often does not have this method.
263 userbase: Optional[str]
264 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
266 if userbase:
267 userdir = os.path.join(userbase, "share", "jupyter")
268 if userdir not in user:
269 user.append(userdir)
271 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH]
273 if prefer_environment_over_user():
274 paths.extend(env)
275 paths.extend(user)
276 else:
277 paths.extend(user)
278 paths.extend(env)
280 # finally, system
281 paths.extend(SYSTEM_JUPYTER_PATH)
283 # add subdir, if requested
284 if subdirs:
285 paths = [pjoin(p, *subdirs) for p in paths]
286 return paths
289if use_platform_dirs():
290 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir(
291 APPNAME, appauthor=False, multipath=True
292 ).split(os.pathsep)
293else:
294 if os.name == "nt": # noqa
295 programdata = os.environ.get("PROGRAMDATA", None)
296 if programdata: # noqa
297 SYSTEM_CONFIG_PATH = [os.path.join(programdata, "jupyter")]
298 else: # PROGRAMDATA is not defined by default on XP.
299 SYSTEM_CONFIG_PATH = []
300 else:
301 SYSTEM_CONFIG_PATH = [
302 "/usr/local/etc/jupyter",
303 "/etc/jupyter",
304 ]
305ENV_CONFIG_PATH: List[str] = [os.path.join(sys.prefix, "etc", "jupyter")]
308def jupyter_config_path() -> List[str]:
309 """Return the search path for Jupyter config files as a list.
311 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the
312 environment-level directories will have priority over user-level
313 directories.
315 If the Python site.ENABLE_USER_SITE variable is True, we also add the
316 appropriate Python user site subdirectory to the user-level directories.
317 """
318 if os.environ.get("JUPYTER_NO_CONFIG"):
319 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set.
320 return [jupyter_config_dir()]
322 paths: List[str] = []
324 # highest priority is explicit environment variable
325 if os.environ.get("JUPYTER_CONFIG_PATH"):
326 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep))
328 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
329 user = [jupyter_config_dir()]
330 if site.ENABLE_USER_SITE:
331 userbase: Optional[str]
332 # Check if site.getuserbase() exists to be compatible with virtualenv,
333 # which often does not have this method.
334 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
336 if userbase:
337 userdir = os.path.join(userbase, "etc", "jupyter")
338 if userdir not in user:
339 user.append(userdir)
341 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH]
343 if prefer_environment_over_user():
344 paths.extend(env)
345 paths.extend(user)
346 else:
347 paths.extend(user)
348 paths.extend(env)
350 # Finally, system path
351 paths.extend(SYSTEM_CONFIG_PATH)
352 return paths
355def exists(path: str) -> bool:
356 """Replacement for `os.path.exists` which works for host mapped volumes
357 on Windows containers
358 """
359 try:
360 os.lstat(path)
361 except OSError:
362 return False
363 return True
366def is_file_hidden_win(abs_path: str, stat_res: Optional[Any] = None) -> bool:
367 """Is a file hidden?
369 This only checks the file itself; it should be called in combination with
370 checking the directory containing the file.
372 Use is_hidden() instead to check the file and its parent directories.
374 Parameters
375 ----------
376 abs_path : unicode
377 The absolute path to check.
378 stat_res : os.stat_result, optional
379 The result of calling stat() on abs_path. If not passed, this function
380 will call stat() internally.
381 """
382 if os.path.basename(abs_path).startswith("."):
383 return True
385 if stat_res is None:
386 try:
387 stat_res = os.stat(abs_path)
388 except OSError as e:
389 if e.errno == errno.ENOENT:
390 return False
391 raise
393 try:
394 if stat_res.st_file_attributes & stat.FILE_ATTRIBUTE_HIDDEN: # type:ignore
395 return True
396 except AttributeError:
397 # allow AttributeError on PyPy for Windows
398 # 'stat_result' object has no attribute 'st_file_attributes'
399 # https://foss.heptapod.net/pypy/pypy/-/issues/3469
400 warnings.warn(
401 "hidden files are not detectable on this system, so no file will be marked as hidden."
402 )
403 pass
405 return False
408def is_file_hidden_posix(abs_path: str, stat_res: Optional[Any] = None) -> bool:
409 """Is a file hidden?
411 This only checks the file itself; it should be called in combination with
412 checking the directory containing the file.
414 Use is_hidden() instead to check the file and its parent directories.
416 Parameters
417 ----------
418 abs_path : unicode
419 The absolute path to check.
420 stat_res : os.stat_result, optional
421 The result of calling stat() on abs_path. If not passed, this function
422 will call stat() internally.
423 """
424 if os.path.basename(abs_path).startswith("."):
425 return True
427 if stat_res is None or stat.S_ISLNK(stat_res.st_mode):
428 try:
429 stat_res = os.stat(abs_path)
430 except OSError as e:
431 if e.errno == errno.ENOENT:
432 return False
433 raise
435 # check that dirs can be listed
436 if stat.S_ISDIR(stat_res.st_mode): # type:ignore[misc] # noqa
437 # use x-access, not actual listing, in case of slow/large listings
438 if not os.access(abs_path, os.X_OK | os.R_OK):
439 return True
441 # check UF_HIDDEN
442 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN:
443 return True
445 return False
448if sys.platform == "win32":
449 is_file_hidden = is_file_hidden_win
450else:
451 is_file_hidden = is_file_hidden_posix
454def is_hidden(abs_path: str, abs_root: str = "") -> bool:
455 """Is a file hidden or contained in a hidden directory?
457 This will start with the rightmost path element and work backwards to the
458 given root to see if a path is hidden or in a hidden directory. Hidden is
459 determined by either name starting with '.' or the UF_HIDDEN flag as
460 reported by stat.
462 If abs_path is the same directory as abs_root, it will be visible even if
463 that is a hidden folder. This only checks the visibility of files
464 and directories *within* abs_root.
466 Parameters
467 ----------
468 abs_path : unicode
469 The absolute path to check for hidden directories.
470 abs_root : unicode
471 The absolute path of the root directory in which hidden directories
472 should be checked for.
473 """
474 abs_path = os.path.normpath(abs_path)
475 abs_root = os.path.normpath(abs_root)
477 if abs_path == abs_root:
478 return False
480 if is_file_hidden(abs_path):
481 return True
483 if not abs_root:
484 abs_root = abs_path.split(os.sep, 1)[0] + os.sep
485 inside_root = abs_path[len(abs_root) :]
486 if any(part.startswith(".") for part in inside_root.split(os.sep)):
487 return True
489 # check UF_HIDDEN on any location up to root.
490 # is_file_hidden() already checked the file, so start from its parent dir
491 path = os.path.dirname(abs_path)
492 while path and path.startswith(abs_root) and path != abs_root:
493 if not exists(path):
494 path = os.path.dirname(path)
495 continue
496 try:
497 # may fail on Windows junctions
498 st = os.lstat(path)
499 except OSError:
500 return True
501 if getattr(st, "st_flags", 0) & UF_HIDDEN:
502 return True
503 path = os.path.dirname(path)
505 return False
508def win32_restrict_file_to_user(fname: str) -> None:
509 """Secure a windows file to read-only access for the user.
510 Follows guidance from win32 library creator:
511 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
513 This method should be executed against an already generated file which
514 has no secrets written to it yet.
516 Parameters
517 ----------
519 fname : unicode
520 The path to the file to secure
521 """
522 try:
523 import win32api
524 except ImportError:
525 return _win32_restrict_file_to_user_ctypes(fname)
527 import ntsecuritycon as con
528 import win32security
530 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone")
531 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid)
532 user, _domain, _type = win32security.LookupAccountName(
533 "", win32api.GetUserNameEx(win32api.NameSamCompatible)
534 )
536 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION)
538 dacl = win32security.ACL()
539 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone)
540 dacl.AddAccessAllowedAce(
541 win32security.ACL_REVISION,
542 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE,
543 user,
544 )
545 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins)
547 sd.SetSecurityDescriptorDacl(1, dacl, 0)
548 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd)
551def _win32_restrict_file_to_user_ctypes(fname: str) -> None: # noqa
552 """Secure a windows file to read-only access for the user.
554 Follows guidance from win32 library creator:
555 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
557 This method should be executed against an already generated file which
558 has no secrets written to it yet.
560 Parameters
561 ----------
563 fname : unicode
564 The path to the file to secure
565 """
566 import ctypes
567 from ctypes import wintypes
569 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined]
570 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined]
572 NameSamCompatible = 2
573 WinBuiltinAdministratorsSid = 26
574 DACL_SECURITY_INFORMATION = 4
575 ACL_REVISION = 2
576 ERROR_INSUFFICIENT_BUFFER = 122
577 ERROR_MORE_DATA = 234
579 SYNCHRONIZE = 0x100000
580 DELETE = 0x00010000
581 STANDARD_RIGHTS_REQUIRED = 0xF0000
582 STANDARD_RIGHTS_READ = 0x20000
583 STANDARD_RIGHTS_WRITE = 0x20000
584 FILE_READ_DATA = 1
585 FILE_READ_EA = 8
586 FILE_READ_ATTRIBUTES = 128
587 FILE_WRITE_DATA = 2
588 FILE_APPEND_DATA = 4
589 FILE_WRITE_EA = 16
590 FILE_WRITE_ATTRIBUTES = 256
591 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF
592 FILE_GENERIC_READ = (
593 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE
594 )
595 FILE_GENERIC_WRITE = (
596 STANDARD_RIGHTS_WRITE
597 | FILE_WRITE_DATA
598 | FILE_WRITE_ATTRIBUTES
599 | FILE_WRITE_EA
600 | FILE_APPEND_DATA
601 | SYNCHRONIZE
602 )
604 class ACL(ctypes.Structure):
605 _fields_ = [
606 ("AclRevision", wintypes.BYTE),
607 ("Sbz1", wintypes.BYTE),
608 ("AclSize", wintypes.WORD),
609 ("AceCount", wintypes.WORD),
610 ("Sbz2", wintypes.WORD),
611 ]
613 PSID = ctypes.c_void_p
614 PACL = ctypes.POINTER(ACL)
615 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE)
617 def _nonzero_success(result, func, args):
618 if not result:
619 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined]
620 return args
622 secur32.GetUserNameExW.errcheck = _nonzero_success
623 secur32.GetUserNameExW.restype = wintypes.BOOL
624 secur32.GetUserNameExW.argtypes = (
625 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat
626 wintypes.LPWSTR, # LPWSTR lpNameBuffer,
627 wintypes.PULONG, # PULONG nSize
628 )
630 advapi32.CreateWellKnownSid.errcheck = _nonzero_success
631 advapi32.CreateWellKnownSid.restype = wintypes.BOOL
632 advapi32.CreateWellKnownSid.argtypes = (
633 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType
634 PSID, # PSID DomainSid
635 PSID, # PSID pSid
636 wintypes.PDWORD, # DWORD *cbSid
637 )
639 advapi32.LookupAccountNameW.errcheck = _nonzero_success
640 advapi32.LookupAccountNameW.restype = wintypes.BOOL
641 advapi32.LookupAccountNameW.argtypes = (
642 wintypes.LPWSTR, # LPCWSTR lpSystemName
643 wintypes.LPWSTR, # LPCWSTR lpAccountName
644 PSID, # PSID Sid
645 wintypes.LPDWORD, # LPDWORD cbSid
646 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName
647 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName
648 wintypes.LPDWORD, # PSID_NAME_USE peUse
649 )
651 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success
652 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL
653 advapi32.AddAccessAllowedAce.argtypes = (
654 PACL, # PACL pAcl
655 wintypes.DWORD, # DWORD dwAceRevision
656 wintypes.DWORD, # DWORD AccessMask
657 PSID, # PSID pSid
658 )
660 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success
661 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL
662 advapi32.SetSecurityDescriptorDacl.argtypes = (
663 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
664 wintypes.BOOL, # BOOL bDaclPresent
665 PACL, # PACL pDacl
666 wintypes.BOOL, # BOOL bDaclDefaulted
667 )
669 advapi32.GetFileSecurityW.errcheck = _nonzero_success
670 advapi32.GetFileSecurityW.restype = wintypes.BOOL
671 advapi32.GetFileSecurityW.argtypes = (
672 wintypes.LPCWSTR, # LPCWSTR lpFileName
673 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation
674 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
675 wintypes.DWORD, # DWORD nLength
676 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded
677 )
679 advapi32.SetFileSecurityW.errcheck = _nonzero_success
680 advapi32.SetFileSecurityW.restype = wintypes.BOOL
681 advapi32.SetFileSecurityW.argtypes = (
682 wintypes.LPCWSTR, # LPCWSTR lpFileName
683 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation
684 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
685 )
687 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success
688 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL
689 advapi32.MakeAbsoluteSD.argtypes = (
690 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
691 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
692 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize
693 PACL, # PACL pDacl
694 wintypes.LPDWORD, # LPDWORD lpdwDaclSize
695 PACL, # PACL pSacl
696 wintypes.LPDWORD, # LPDWORD lpdwSaclSize
697 PSID, # PSID pOwner
698 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize
699 PSID, # PSID pPrimaryGroup
700 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize
701 )
703 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success
704 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL
705 advapi32.MakeSelfRelativeSD.argtypes = (
706 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
707 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
708 wintypes.LPDWORD, # LPDWORD lpdwBufferLength
709 )
711 advapi32.InitializeAcl.errcheck = _nonzero_success
712 advapi32.InitializeAcl.restype = wintypes.BOOL
713 advapi32.InitializeAcl.argtypes = (
714 PACL, # PACL pAcl,
715 wintypes.DWORD, # DWORD nAclLength,
716 wintypes.DWORD, # DWORD dwAclRevision
717 )
719 def CreateWellKnownSid(WellKnownSidType):
720 # return a SID for predefined aliases
721 pSid = (ctypes.c_char * 1)()
722 cbSid = wintypes.DWORD()
723 try:
724 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
725 except OSError as e:
726 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
727 raise
728 pSid = (ctypes.c_char * cbSid.value)()
729 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
730 return pSid[:]
732 def GetUserNameEx(NameFormat):
733 # return the user or other security principal associated with
734 # the calling thread
735 nSize = ctypes.pointer(ctypes.c_ulong(0))
736 try:
737 secur32.GetUserNameExW(NameFormat, None, nSize)
738 except OSError as e:
739 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined]
740 raise
741 if not nSize.contents.value:
742 return None
743 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value)
744 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize)
745 return lpNameBuffer.value
747 def LookupAccountName(lpSystemName, lpAccountName):
748 # return a security identifier (SID) for an account on a system
749 # and the name of the domain on which the account was found
750 cbSid = wintypes.DWORD(0)
751 cchReferencedDomainName = wintypes.DWORD(0)
752 peUse = wintypes.DWORD(0)
753 try:
754 advapi32.LookupAccountNameW(
755 lpSystemName,
756 lpAccountName,
757 None,
758 ctypes.byref(cbSid),
759 None,
760 ctypes.byref(cchReferencedDomainName),
761 ctypes.byref(peUse),
762 )
763 except OSError as e:
764 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
765 raise
766 Sid = ctypes.create_unicode_buffer("", cbSid.value)
767 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID)
768 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1)
769 success = advapi32.LookupAccountNameW(
770 lpSystemName,
771 lpAccountName,
772 pSid,
773 ctypes.byref(cbSid),
774 lpReferencedDomainName,
775 ctypes.byref(cchReferencedDomainName),
776 ctypes.byref(peUse),
777 )
778 if not success:
779 raise ctypes.WinError() # type:ignore[attr-defined]
780 return pSid, lpReferencedDomainName.value, peUse.value
782 def AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid):
783 # add an access-allowed access control entry (ACE)
784 # to an access control list (ACL)
785 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid)
787 def GetFileSecurity(lpFileName, RequestedInformation):
788 # return information about the security of a file or directory
789 nLength = wintypes.DWORD(0)
790 try:
791 advapi32.GetFileSecurityW(
792 lpFileName,
793 RequestedInformation,
794 None,
795 0,
796 ctypes.byref(nLength),
797 )
798 except OSError as e:
799 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
800 raise
801 if not nLength.value:
802 return None
803 pSecurityDescriptor = (wintypes.BYTE * nLength.value)()
804 advapi32.GetFileSecurityW(
805 lpFileName,
806 RequestedInformation,
807 pSecurityDescriptor,
808 nLength,
809 ctypes.byref(nLength),
810 )
811 return pSecurityDescriptor
813 def SetFileSecurity(lpFileName, RequestedInformation, pSecurityDescriptor):
814 # set the security of a file or directory object
815 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor)
817 def SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted):
818 # set information in a discretionary access control list (DACL)
819 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted)
821 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor):
822 # return a security descriptor in absolute format
823 # by using a security descriptor in self-relative format as a template
824 pAbsoluteSecurityDescriptor = None
825 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0)
826 pDacl = None
827 lpdwDaclSize = wintypes.DWORD(0)
828 pSacl = None
829 lpdwSaclSize = wintypes.DWORD(0)
830 pOwner = None
831 lpdwOwnerSize = wintypes.DWORD(0)
832 pPrimaryGroup = None
833 lpdwPrimaryGroupSize = wintypes.DWORD(0)
834 try:
835 advapi32.MakeAbsoluteSD(
836 pSelfRelativeSecurityDescriptor,
837 pAbsoluteSecurityDescriptor,
838 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
839 pDacl,
840 ctypes.byref(lpdwDaclSize),
841 pSacl,
842 ctypes.byref(lpdwSaclSize),
843 pOwner,
844 ctypes.byref(lpdwOwnerSize),
845 pPrimaryGroup,
846 ctypes.byref(lpdwPrimaryGroupSize),
847 )
848 except OSError as e:
849 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
850 raise
851 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)()
852 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)()
853 pDacl = ctypes.cast(pDaclData, PACL).contents
854 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)()
855 pSacl = ctypes.cast(pSaclData, PACL).contents
856 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)()
857 pOwner = ctypes.cast(pOwnerData, PSID)
858 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)()
859 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID)
860 advapi32.MakeAbsoluteSD(
861 pSelfRelativeSecurityDescriptor,
862 pAbsoluteSecurityDescriptor,
863 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
864 pDacl,
865 ctypes.byref(lpdwDaclSize),
866 pSacl,
867 ctypes.byref(lpdwSaclSize),
868 pOwner,
869 lpdwOwnerSize,
870 pPrimaryGroup,
871 ctypes.byref(lpdwPrimaryGroupSize),
872 )
873 return pAbsoluteSecurityDescriptor
875 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor):
876 # return a security descriptor in self-relative format
877 # by using a security descriptor in absolute format as a template
878 pSelfRelativeSecurityDescriptor = None
879 lpdwBufferLength = wintypes.DWORD(0)
880 try:
881 advapi32.MakeSelfRelativeSD(
882 pAbsoluteSecurityDescriptor,
883 pSelfRelativeSecurityDescriptor,
884 ctypes.byref(lpdwBufferLength),
885 )
886 except OSError as e:
887 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
888 raise
889 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)()
890 advapi32.MakeSelfRelativeSD(
891 pAbsoluteSecurityDescriptor,
892 pSelfRelativeSecurityDescriptor,
893 ctypes.byref(lpdwBufferLength),
894 )
895 return pSelfRelativeSecurityDescriptor
897 def NewAcl():
898 # return a new, initialized ACL (access control list) structure
899 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ?
900 acl_data = ctypes.create_string_buffer(nAclLength)
901 pAcl = ctypes.cast(acl_data, PACL).contents
902 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION)
903 return pAcl
905 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid)
906 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0]
908 Acl = NewAcl()
909 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins)
910 AddAccessAllowedAce(
911 Acl,
912 ACL_REVISION,
913 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE,
914 SidUser,
915 )
917 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION)
918 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD)
919 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0)
920 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD)
922 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD)
925def get_file_mode(fname: str) -> int:
926 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner.
928 Parameters
929 ----------
931 fname : unicode
932 The path to the file to get mode from
934 """
935 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we
936 # should tolerate the execute bit on the file's owner when validating permissions - thus
937 # the missing least significant bit on the third octal digit. In addition, we also tolerate
938 # the sticky bit being set, so the lsb from the fourth octal digit is also removed.
939 return (
940 stat.S_IMODE(os.stat(fname).st_mode) & 0o6677
941 ) # Use 4 octal digits since S_IMODE does the same
944allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1")
947@contextmanager
948def secure_write(fname: str, binary: bool = False) -> Iterator[Any]:
949 """Opens a file in the most restricted pattern available for
950 writing content. This limits the file mode to `0o0600` and yields
951 the resulting opened filed handle.
953 Parameters
954 ----------
956 fname : unicode
957 The path to the file to write
959 binary: boolean
960 Indicates that the file is binary
961 """
962 mode = "wb" if binary else "w"
963 encoding = None if binary else "utf-8"
964 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
965 try:
966 os.remove(fname)
967 except OSError:
968 # Skip any issues with the file not existing
969 pass
971 if os.name == "nt":
972 if allow_insecure_writes:
973 # Mounted file systems can have a number of failure modes inside this block.
974 # For windows machines in insecure mode we simply skip this to avoid failures :/
975 issue_insecure_write_warning()
976 else:
977 # Python on windows does not respect the group and public bits for chmod, so we need
978 # to take additional steps to secure the contents.
979 # Touch file pre-emptively to avoid editing permissions in open files in Windows
980 fd = os.open(fname, open_flag, 0o0600)
981 os.close(fd)
982 open_flag = os.O_WRONLY | os.O_TRUNC
983 win32_restrict_file_to_user(fname)
985 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f:
986 if os.name != "nt":
987 # Enforce that the file got the requested permissions before writing
988 file_mode = get_file_mode(fname)
989 if file_mode != 0o0600: # noqa
990 if allow_insecure_writes:
991 issue_insecure_write_warning()
992 else:
993 msg = (
994 "Permissions assignment failed for secure file: '{file}'."
995 " Got '{permissions}' instead of '0o0600'.".format(
996 file=fname, permissions=oct(file_mode)
997 )
998 )
999 raise RuntimeError(msg)
1000 yield f
1003def issue_insecure_write_warning() -> None:
1004 """Issue an insecure write warning."""
1006 def format_warning(msg, *args, **kwargs):
1007 return str(msg) + "\n"
1009 warnings.formatwarning = format_warning # type:ignore[assignment]
1010 warnings.warn(
1011 "WARNING: Insecure writes have been enabled via environment variable "
1012 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the "
1013 "variable or set its value to 'False'."
1014 )