1"""Path utility functions."""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6# Derived from IPython.utils.path, which is
7# Copyright (c) IPython Development Team.
8# Distributed under the terms of the Modified BSD License.
9from __future__ import annotations
10
11import errno
12import os
13import site
14import stat
15import sys
16import tempfile
17import warnings
18from contextlib import contextmanager
19from pathlib import Path
20from typing import Any, Iterator, Literal, Optional, overload
21
22import platformdirs
23
24from .utils import deprecation
25
26pjoin = os.path.join
27
28# Capitalize Jupyter in paths only on Windows and MacOS (when not in Homebrew)
29if sys.platform == "win32" or (
30 sys.platform == "darwin" and not sys.prefix.startswith("/opt/homebrew")
31):
32 APPNAME = "Jupyter"
33else:
34 APPNAME = "jupyter"
35
36# UF_HIDDEN is a stat flag not defined in the stat module.
37# It is used by BSD to indicate hidden files.
38UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768)
39
40
41@overload
42def envset(name: str, default: bool = False) -> bool: ...
43
44
45@overload
46def envset(name: str, default: Literal[None]) -> Optional[bool]: ...
47
48
49def envset(name: str, default: Optional[bool] = False) -> Optional[bool]:
50 """Return the boolean value of a given environment variable.
51
52 An environment variable is considered set if it is assigned to a value
53 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive)
54
55 If the environment variable is not defined, the default value is returned.
56 """
57 if name not in os.environ:
58 return default
59
60 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"]
61
62
63def use_platform_dirs() -> bool:
64 """Determine if platformdirs should be used for system-specific paths.
65
66 We plan for this to default to False in jupyter_core version 5 and to True
67 in jupyter_core version 6.
68 """
69 return envset("JUPYTER_PLATFORM_DIRS", False)
70
71
72def get_home_dir() -> str:
73 """Get the real path of the home directory"""
74 homedir = Path("~").expanduser()
75 # Next line will make things work even when /home/ is a symlink to
76 # /usr/home as it is on FreeBSD, for example
77 return str(Path(homedir).resolve())
78
79
80_dtemps: dict[str, str] = {}
81
82
83def _do_i_own(path: str) -> bool:
84 """Return whether the current user owns the given path"""
85 p = Path(path).resolve()
86
87 # walk up to first existing parent
88 while not p.exists() and p != p.parent:
89 p = p.parent
90
91 # simplest check: owner by name
92 # not always implemented or available
93 try:
94 return p.owner() == os.getlogin()
95 except Exception: # noqa: S110
96 pass
97
98 if hasattr(os, "geteuid"):
99 try:
100 st = p.stat()
101 return st.st_uid == os.geteuid()
102 except (NotImplementedError, OSError):
103 # geteuid not always implemented
104 pass
105
106 # no ownership checks worked, check write access
107 return os.access(p, os.W_OK)
108
109
110def prefer_environment_over_user() -> bool:
111 """Determine if environment-level paths should take precedence over user-level paths."""
112 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value
113 if "JUPYTER_PREFER_ENV_PATH" in os.environ:
114 return envset("JUPYTER_PREFER_ENV_PATH")
115
116 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def)
117 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix):
118 return True
119
120 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True
121 if (
122 "CONDA_PREFIX" in os.environ
123 and sys.prefix.startswith(os.environ["CONDA_PREFIX"])
124 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base"
125 and _do_i_own(sys.prefix)
126 ):
127 return True
128
129 return False
130
131
132def _mkdtemp_once(name: str) -> str:
133 """Make or reuse a temporary directory.
134
135 If this is called with the same name in the same process, it will return
136 the same directory.
137 """
138 try:
139 return _dtemps[name]
140 except KeyError:
141 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-")
142 return d
143
144
145def jupyter_config_dir() -> str:
146 """Get the Jupyter config directory for this platform and user.
147
148 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate
149 directory for the platform.
150 """
151
152 env = os.environ
153 if env.get("JUPYTER_NO_CONFIG"):
154 return _mkdtemp_once("jupyter-clean-cfg")
155
156 if env.get("JUPYTER_CONFIG_DIR"):
157 return env["JUPYTER_CONFIG_DIR"]
158
159 if use_platform_dirs():
160 return platformdirs.user_config_dir(APPNAME, appauthor=False)
161
162 home_dir = get_home_dir()
163 return pjoin(home_dir, ".jupyter")
164
165
166def jupyter_data_dir() -> str:
167 """Get the config directory for Jupyter data files for this platform and user.
168
169 These are non-transient, non-configuration files.
170
171 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path.
172 """
173 env = os.environ
174
175 if env.get("JUPYTER_DATA_DIR"):
176 return env["JUPYTER_DATA_DIR"]
177
178 if use_platform_dirs():
179 return platformdirs.user_data_dir(APPNAME, appauthor=False)
180
181 home = get_home_dir()
182
183 if sys.platform == "darwin":
184 return str(Path(home, "Library", "Jupyter"))
185 if sys.platform == "win32":
186 appdata = os.environ.get("APPDATA", None)
187 if appdata:
188 return str(Path(appdata, "jupyter").resolve())
189 return pjoin(jupyter_config_dir(), "data")
190 # Linux, non-OS X Unix, AIX, etc.
191 xdg = env.get("XDG_DATA_HOME", None)
192 if not xdg:
193 xdg = pjoin(home, ".local", "share")
194 return pjoin(xdg, "jupyter")
195
196
197def jupyter_runtime_dir() -> str:
198 """Return the runtime dir for transient jupyter files.
199
200 Returns JUPYTER_RUNTIME_DIR if defined.
201
202 The default is now (data_dir)/runtime on all platforms;
203 we no longer use XDG_RUNTIME_DIR after various problems.
204 """
205 env = os.environ
206
207 if env.get("JUPYTER_RUNTIME_DIR"):
208 return env["JUPYTER_RUNTIME_DIR"]
209
210 return pjoin(jupyter_data_dir(), "runtime")
211
212
213# %PROGRAMDATA% is not safe by default, require opt-in to trust it
214_use_programdata: bool = envset("JUPYTER_USE_PROGRAMDATA")
215# _win_programdata is a path str if we're using it, None otherwise
216_win_programdata: str | None = None
217if os.name == "nt" and _use_programdata:
218 _win_programdata = os.environ.get("PROGRAMDATA", None)
219
220
221if use_platform_dirs():
222 if os.name == "nt" and not _use_programdata:
223 # default PROGRAMDATA used by site_* is not safe by default on Windows
224 SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))]
225 else:
226 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir(
227 APPNAME, appauthor=False, multipath=True
228 ).split(os.pathsep)
229else:
230 deprecation(
231 "Jupyter is migrating its paths to use standard platformdirs\n"
232 "given by the platformdirs library. To remove this warning and\n"
233 "see the appropriate new directories, set the environment variable\n"
234 "`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.\n"
235 "The use of platformdirs will be the default in `jupyter_core` v6"
236 )
237 if os.name == "nt":
238 # PROGRAMDATA is not defined by default on XP, and not safe by default
239 if _win_programdata:
240 SYSTEM_JUPYTER_PATH = [pjoin(_win_programdata, "jupyter")]
241 else:
242 SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))]
243 else:
244 SYSTEM_JUPYTER_PATH = [
245 "/usr/local/share/jupyter",
246 "/usr/share/jupyter",
247 ]
248
249ENV_JUPYTER_PATH: list[str] = [str(Path(sys.prefix, "share", "jupyter"))]
250
251
252def jupyter_path(*subdirs: str) -> list[str]:
253 """Return a list of directories to search for data files.
254
255 There are four sources of paths to search:
256
257 - $JUPYTER_PATH environment variable (always highest priority)
258 - user directories (e.g. ~/.local/share/jupyter)
259 - environment directories (e.g. {sys.prefix}/share/jupyter)
260 - system-wide paths (e.g. /usr/local/share/jupyter)
261
262 JUPYTER_PATH environment variable has highest priority, if defined,
263 and is purely additive.
264
265 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level
266 directories will have priority over user-level directories.
267 You can also set JUPYTER_PREFER_ENV_PATH=0 to explicitly prefer user directories.
268 If Jupyter detects that you are in a virtualenv or conda environment,
269 environment paths are also preferred to user paths,
270 otherwise user paths are preferred to environment paths.
271
272 If the Python site.ENABLE_USER_SITE variable is True, we also add the
273 appropriate Python user site subdirectory to the user-level directories.
274
275 Finally, system-wide directories, such as `/usr/local/share/jupyter` are searched.
276
277 If ``*subdirs`` are given, that subdirectory will be added to each element.
278
279
280 .. versionchanged:: 5.8
281
282 On Windows, %PROGRAMDATA% will be used as a system-wide path only if
283 the JUPYTER_USE_PROGRAMDATA env is set.
284 By default, there is no default system-wide path on Windows and the env path
285 is used instead.
286
287 Examples:
288
289 >>> jupyter_path()
290 ['~/.local/jupyter', '/usr/local/share/jupyter']
291 >>> jupyter_path('kernels')
292 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels']
293 """
294
295 paths: list[str] = []
296
297 # highest priority is explicit environment variable
298 if os.environ.get("JUPYTER_PATH"):
299 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep))
300
301 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
302 user = [jupyter_data_dir()]
303 if site.ENABLE_USER_SITE:
304 # Check if site.getuserbase() exists to be compatible with virtualenv,
305 # which often does not have this method.
306 userbase: Optional[str]
307 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
308
309 if userbase:
310 userdir = str(Path(userbase, "share", "jupyter"))
311 if userdir not in user:
312 user.append(userdir)
313
314 # Windows usually doesn't have a 'system' prefix,
315 # so 'system' and 'env' are the same
316 # make sure that env can still be preferred in this case
317 if ENV_JUPYTER_PATH == SYSTEM_JUPYTER_PATH:
318 env = ENV_JUPYTER_PATH
319 else:
320 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH]
321
322 if prefer_environment_over_user():
323 paths.extend(env)
324 paths.extend(user)
325 else:
326 paths.extend(user)
327 paths.extend(env)
328
329 # finally, add system paths (can overlap with env, so avoid duplicates)
330 for _path in SYSTEM_JUPYTER_PATH:
331 if _path not in paths:
332 paths.append(_path)
333
334 # add subdir, if requested
335 if subdirs:
336 paths = [pjoin(p, *subdirs) for p in paths]
337 return paths
338
339
340ENV_CONFIG_PATH: list[str] = [str(Path(sys.prefix, "etc", "jupyter"))]
341
342if use_platform_dirs():
343 if os.name == "nt" and not _use_programdata:
344 # default PROGRAMDATA is not safe by default on Windows
345 # use ENV to avoid an empty list, since some may assume this is non-empty
346 SYSTEM_CONFIG_PATH = ENV_CONFIG_PATH[:]
347 else:
348 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir(
349 APPNAME, appauthor=False, multipath=True
350 ).split(os.pathsep)
351elif os.name == "nt":
352 # PROGRAMDATA is not defined by default on XP, and not safe by default
353 # but make sure it's not empty
354 if _win_programdata:
355 SYSTEM_CONFIG_PATH = [str(Path(_win_programdata, "jupyter"))]
356 else:
357 SYSTEM_CONFIG_PATH = ENV_CONFIG_PATH[:]
358else:
359 SYSTEM_CONFIG_PATH = [
360 "/usr/local/etc/jupyter",
361 "/etc/jupyter",
362 ]
363
364
365def jupyter_config_path() -> list[str]:
366 """Return the search path for Jupyter config files as a list.
367
368 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the
369 environment-level directories will have priority over user-level
370 directories.
371
372 If the Python site.ENABLE_USER_SITE variable is True, we also add the
373 appropriate Python user site subdirectory to the user-level directories.
374
375 Finally, system-wide directories such as `/usr/local/etc/jupyter` are searched.
376
377
378 .. versionchanged:: 5.8
379
380 On Windows, %PROGRAMDATA% will be used as a system-wide path only if
381 the JUPYTER_USE_PROGRAMDATA env is set.
382 By default, there is no system-wide config path on Windows.
383
384 Examples:
385
386 >>> jupyter_config_path()
387 ['~/.jupyter', '~/.local/etc/jupyter', '/usr/local/etc/jupyter', '/etc/jupyter']
388
389 """
390 if os.environ.get("JUPYTER_NO_CONFIG"):
391 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set.
392 return [jupyter_config_dir()]
393
394 paths: list[str] = []
395
396 # highest priority is explicit environment variable
397 if os.environ.get("JUPYTER_CONFIG_PATH"):
398 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep))
399
400 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
401 user = [jupyter_config_dir()]
402 if site.ENABLE_USER_SITE:
403 userbase: Optional[str]
404 # Check if site.getuserbase() exists to be compatible with virtualenv,
405 # which often does not have this method.
406 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
407
408 if userbase:
409 userdir = str(Path(userbase, "etc", "jupyter"))
410 if userdir not in user:
411 user.append(userdir)
412
413 # Windows usually doesn't have a 'system' prefix,
414 # so 'system' and 'env' are the same
415 # make sure that env can still be preferred in this case
416 if ENV_CONFIG_PATH == SYSTEM_CONFIG_PATH:
417 env = ENV_CONFIG_PATH
418 else:
419 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH]
420
421 if prefer_environment_over_user():
422 paths.extend(env)
423 paths.extend(user)
424 else:
425 paths.extend(user)
426 paths.extend(env)
427
428 # Finally, system path
429 if ENV_CONFIG_PATH != SYSTEM_CONFIG_PATH:
430 paths.extend(SYSTEM_CONFIG_PATH)
431 return paths
432
433
434def exists(path: str) -> bool:
435 """Replacement for `os.path.exists` which works for host mapped volumes
436 on Windows containers
437 """
438 try:
439 os.lstat(path)
440 except OSError:
441 return False
442 return True
443
444
445def is_file_hidden_win(abs_path: str, stat_res: Optional[Any] = None) -> bool:
446 """Is a file hidden?
447
448 This only checks the file itself; it should be called in combination with
449 checking the directory containing the file.
450
451 Use is_hidden() instead to check the file and its parent directories.
452
453 Parameters
454 ----------
455 abs_path : unicode
456 The absolute path to check.
457 stat_res : os.stat_result, optional
458 The result of calling stat() on abs_path. If not passed, this function
459 will call stat() internally.
460 """
461 if Path(abs_path).name.startswith("."):
462 return True
463
464 if stat_res is None:
465 try:
466 stat_res = Path(abs_path).stat()
467 except OSError as e:
468 if e.errno == errno.ENOENT:
469 return False
470 raise
471
472 try:
473 if (
474 stat_res.st_file_attributes # type:ignore[union-attr]
475 & stat.FILE_ATTRIBUTE_HIDDEN # type:ignore[attr-defined]
476 ):
477 return True
478 except AttributeError:
479 # allow AttributeError on PyPy for Windows
480 # 'stat_result' object has no attribute 'st_file_attributes'
481 # https://foss.heptapod.net/pypy/pypy/-/issues/3469
482 warnings.warn(
483 "hidden files are not detectable on this system, so no file will be marked as hidden.",
484 stacklevel=2,
485 )
486
487 return False
488
489
490def is_file_hidden_posix(abs_path: str, stat_res: Optional[Any] = None) -> bool:
491 """Is a file hidden?
492
493 This only checks the file itself; it should be called in combination with
494 checking the directory containing the file.
495
496 Use is_hidden() instead to check the file and its parent directories.
497
498 Parameters
499 ----------
500 abs_path : unicode
501 The absolute path to check.
502 stat_res : os.stat_result, optional
503 The result of calling stat() on abs_path. If not passed, this function
504 will call stat() internally.
505 """
506 if Path(abs_path).name.startswith("."):
507 return True
508
509 if stat_res is None or stat.S_ISLNK(stat_res.st_mode):
510 try:
511 stat_res = Path(abs_path).stat()
512 except OSError as e:
513 if e.errno == errno.ENOENT:
514 return False
515 raise
516
517 # check that dirs can be listed
518 if stat.S_ISDIR(stat_res.st_mode): # noqa: SIM102
519 # use x-access, not actual listing, in case of slow/large listings
520 if not os.access(abs_path, os.X_OK | os.R_OK):
521 return True
522
523 # check UF_HIDDEN
524 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN:
525 return True
526
527 return False
528
529
530if sys.platform == "win32":
531 is_file_hidden = is_file_hidden_win
532else:
533 is_file_hidden = is_file_hidden_posix
534
535
536def is_hidden(abs_path: str, abs_root: str = "") -> bool:
537 """Is a file hidden or contained in a hidden directory?
538
539 This will start with the rightmost path element and work backwards to the
540 given root to see if a path is hidden or in a hidden directory. Hidden is
541 determined by either name starting with '.' or the UF_HIDDEN flag as
542 reported by stat.
543
544 If abs_path is the same directory as abs_root, it will be visible even if
545 that is a hidden folder. This only checks the visibility of files
546 and directories *within* abs_root.
547
548 Parameters
549 ----------
550 abs_path : unicode
551 The absolute path to check for hidden directories.
552 abs_root : unicode
553 The absolute path of the root directory in which hidden directories
554 should be checked for.
555 """
556 abs_path = os.path.normpath(abs_path)
557 abs_root = os.path.normpath(abs_root)
558
559 if abs_path == abs_root:
560 return False
561
562 if is_file_hidden(abs_path):
563 return True
564
565 if not abs_root:
566 abs_root = abs_path.split(os.sep, 1)[0] + os.sep
567 inside_root = abs_path[len(abs_root) :]
568 if any(part.startswith(".") for part in Path(inside_root).parts):
569 return True
570
571 # check UF_HIDDEN on any location up to root.
572 # is_file_hidden() already checked the file, so start from its parent dir
573 path = str(Path(abs_path).parent)
574 while path and path.startswith(abs_root) and path != abs_root:
575 if not Path(path).exists():
576 path = str(Path(path).parent)
577 continue
578 try:
579 # may fail on Windows junctions
580 st = os.lstat(path)
581 except OSError:
582 return True
583 if getattr(st, "st_flags", 0) & UF_HIDDEN:
584 return True
585 path = str(Path(path).parent)
586
587 return False
588
589
590def win32_restrict_file_to_user(fname: str) -> None:
591 """Secure a windows file to read-only access for the user.
592 Follows guidance from win32 library creator:
593 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
594
595 This method should be executed against an already generated file which
596 has no secrets written to it yet.
597
598 Parameters
599 ----------
600
601 fname : unicode
602 The path to the file to secure
603 """
604 try:
605 import win32api
606 except ImportError:
607 return _win32_restrict_file_to_user_ctypes(fname)
608
609 import ntsecuritycon as con
610 import win32security
611
612 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone")
613 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid)
614 user, _domain, _type = win32security.LookupAccountName(
615 "", win32api.GetUserNameEx(win32api.NameSamCompatible)
616 )
617
618 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION)
619
620 dacl = win32security.ACL()
621 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone)
622 dacl.AddAccessAllowedAce(
623 win32security.ACL_REVISION,
624 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE,
625 user,
626 )
627 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins)
628
629 sd.SetSecurityDescriptorDacl(1, dacl, 0)
630 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd)
631 return None
632
633
634def _win32_restrict_file_to_user_ctypes(fname: str) -> None:
635 """Secure a windows file to read-only access for the user.
636
637 Follows guidance from win32 library creator:
638 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
639
640 This method should be executed against an already generated file which
641 has no secrets written to it yet.
642
643 Parameters
644 ----------
645
646 fname : unicode
647 The path to the file to secure
648 """
649 import ctypes
650 from ctypes import wintypes
651
652 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined]
653 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined]
654
655 NameSamCompatible = 2
656 WinBuiltinAdministratorsSid = 26
657 DACL_SECURITY_INFORMATION = 4
658 ACL_REVISION = 2
659 ERROR_INSUFFICIENT_BUFFER = 122
660 ERROR_MORE_DATA = 234
661
662 SYNCHRONIZE = 0x100000
663 DELETE = 0x00010000
664 STANDARD_RIGHTS_REQUIRED = 0xF0000
665 STANDARD_RIGHTS_READ = 0x20000
666 STANDARD_RIGHTS_WRITE = 0x20000
667 FILE_READ_DATA = 1
668 FILE_READ_EA = 8
669 FILE_READ_ATTRIBUTES = 128
670 FILE_WRITE_DATA = 2
671 FILE_APPEND_DATA = 4
672 FILE_WRITE_EA = 16
673 FILE_WRITE_ATTRIBUTES = 256
674 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF
675 FILE_GENERIC_READ = (
676 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE
677 )
678 FILE_GENERIC_WRITE = (
679 STANDARD_RIGHTS_WRITE
680 | FILE_WRITE_DATA
681 | FILE_WRITE_ATTRIBUTES
682 | FILE_WRITE_EA
683 | FILE_APPEND_DATA
684 | SYNCHRONIZE
685 )
686
687 class ACL(ctypes.Structure):
688 _fields_ = [
689 ("AclRevision", wintypes.BYTE),
690 ("Sbz1", wintypes.BYTE),
691 ("AclSize", wintypes.WORD),
692 ("AceCount", wintypes.WORD),
693 ("Sbz2", wintypes.WORD),
694 ]
695
696 PSID = ctypes.c_void_p
697 PACL = ctypes.POINTER(ACL)
698 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE)
699
700 def _nonzero_success(result: int, func: Any, args: Any) -> Any: # noqa: ARG001
701 if not result:
702 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined]
703 return args
704
705 secur32.GetUserNameExW.errcheck = _nonzero_success
706 secur32.GetUserNameExW.restype = wintypes.BOOL
707 secur32.GetUserNameExW.argtypes = (
708 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat
709 wintypes.LPWSTR, # LPWSTR lpNameBuffer,
710 wintypes.PULONG, # PULONG nSize
711 )
712
713 advapi32.CreateWellKnownSid.errcheck = _nonzero_success
714 advapi32.CreateWellKnownSid.restype = wintypes.BOOL
715 advapi32.CreateWellKnownSid.argtypes = (
716 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType
717 PSID, # PSID DomainSid
718 PSID, # PSID pSid
719 wintypes.PDWORD, # DWORD *cbSid
720 )
721
722 advapi32.LookupAccountNameW.errcheck = _nonzero_success
723 advapi32.LookupAccountNameW.restype = wintypes.BOOL
724 advapi32.LookupAccountNameW.argtypes = (
725 wintypes.LPWSTR, # LPCWSTR lpSystemName
726 wintypes.LPWSTR, # LPCWSTR lpAccountName
727 PSID, # PSID Sid
728 wintypes.LPDWORD, # LPDWORD cbSid
729 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName
730 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName
731 wintypes.LPDWORD, # PSID_NAME_USE peUse
732 )
733
734 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success
735 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL
736 advapi32.AddAccessAllowedAce.argtypes = (
737 PACL, # PACL pAcl
738 wintypes.DWORD, # DWORD dwAceRevision
739 wintypes.DWORD, # DWORD AccessMask
740 PSID, # PSID pSid
741 )
742
743 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success
744 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL
745 advapi32.SetSecurityDescriptorDacl.argtypes = (
746 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
747 wintypes.BOOL, # BOOL bDaclPresent
748 PACL, # PACL pDacl
749 wintypes.BOOL, # BOOL bDaclDefaulted
750 )
751
752 advapi32.GetFileSecurityW.errcheck = _nonzero_success
753 advapi32.GetFileSecurityW.restype = wintypes.BOOL
754 advapi32.GetFileSecurityW.argtypes = (
755 wintypes.LPCWSTR, # LPCWSTR lpFileName
756 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation
757 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
758 wintypes.DWORD, # DWORD nLength
759 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded
760 )
761
762 advapi32.SetFileSecurityW.errcheck = _nonzero_success
763 advapi32.SetFileSecurityW.restype = wintypes.BOOL
764 advapi32.SetFileSecurityW.argtypes = (
765 wintypes.LPCWSTR, # LPCWSTR lpFileName
766 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation
767 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
768 )
769
770 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success
771 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL
772 advapi32.MakeAbsoluteSD.argtypes = (
773 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
774 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
775 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize
776 PACL, # PACL pDacl
777 wintypes.LPDWORD, # LPDWORD lpdwDaclSize
778 PACL, # PACL pSacl
779 wintypes.LPDWORD, # LPDWORD lpdwSaclSize
780 PSID, # PSID pOwner
781 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize
782 PSID, # PSID pPrimaryGroup
783 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize
784 )
785
786 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success
787 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL
788 advapi32.MakeSelfRelativeSD.argtypes = (
789 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
790 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
791 wintypes.LPDWORD, # LPDWORD lpdwBufferLength
792 )
793
794 advapi32.InitializeAcl.errcheck = _nonzero_success
795 advapi32.InitializeAcl.restype = wintypes.BOOL
796 advapi32.InitializeAcl.argtypes = (
797 PACL, # PACL pAcl,
798 wintypes.DWORD, # DWORD nAclLength,
799 wintypes.DWORD, # DWORD dwAclRevision
800 )
801
802 def CreateWellKnownSid(WellKnownSidType: Any) -> Any:
803 # return a SID for predefined aliases
804 pSid = (ctypes.c_char * 1)()
805 cbSid = wintypes.DWORD()
806 try:
807 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
808 except OSError as e:
809 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
810 raise
811 pSid = (ctypes.c_char * cbSid.value)()
812 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
813 return pSid[:]
814
815 def GetUserNameEx(NameFormat: Any) -> Any:
816 # return the user or other security principal associated with
817 # the calling thread
818 nSize = ctypes.pointer(ctypes.c_ulong(0))
819 try:
820 secur32.GetUserNameExW(NameFormat, None, nSize)
821 except OSError as e:
822 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined]
823 raise
824 if not nSize.contents.value:
825 return None
826 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value)
827 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize)
828 return lpNameBuffer.value
829
830 def LookupAccountName(lpSystemName: Any, lpAccountName: Any) -> Any:
831 # return a security identifier (SID) for an account on a system
832 # and the name of the domain on which the account was found
833 cbSid = wintypes.DWORD(0)
834 cchReferencedDomainName = wintypes.DWORD(0)
835 peUse = wintypes.DWORD(0)
836 try:
837 advapi32.LookupAccountNameW(
838 lpSystemName,
839 lpAccountName,
840 None,
841 ctypes.byref(cbSid),
842 None,
843 ctypes.byref(cchReferencedDomainName),
844 ctypes.byref(peUse),
845 )
846 except OSError as e:
847 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
848 raise
849 Sid = ctypes.create_unicode_buffer("", cbSid.value)
850 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID)
851 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1)
852 success = advapi32.LookupAccountNameW(
853 lpSystemName,
854 lpAccountName,
855 pSid,
856 ctypes.byref(cbSid),
857 lpReferencedDomainName,
858 ctypes.byref(cchReferencedDomainName),
859 ctypes.byref(peUse),
860 )
861 if not success:
862 raise ctypes.WinError() # type:ignore[attr-defined]
863 return pSid, lpReferencedDomainName.value, peUse.value
864
865 def AddAccessAllowedAce(pAcl: Any, dwAceRevision: Any, AccessMask: Any, pSid: Any) -> Any:
866 # add an access-allowed access control entry (ACE)
867 # to an access control list (ACL)
868 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid)
869
870 def GetFileSecurity(lpFileName: Any, RequestedInformation: Any) -> Any:
871 # return information about the security of a file or directory
872 nLength = wintypes.DWORD(0)
873 try:
874 advapi32.GetFileSecurityW(
875 lpFileName,
876 RequestedInformation,
877 None,
878 0,
879 ctypes.byref(nLength),
880 )
881 except OSError as e:
882 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
883 raise
884 if not nLength.value:
885 return None
886 pSecurityDescriptor = (wintypes.BYTE * nLength.value)()
887 advapi32.GetFileSecurityW(
888 lpFileName,
889 RequestedInformation,
890 pSecurityDescriptor,
891 nLength,
892 ctypes.byref(nLength),
893 )
894 return pSecurityDescriptor
895
896 def SetFileSecurity(
897 lpFileName: Any, RequestedInformation: Any, pSecurityDescriptor: Any
898 ) -> Any:
899 # set the security of a file or directory object
900 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor)
901
902 def SetSecurityDescriptorDacl(
903 pSecurityDescriptor: Any, bDaclPresent: Any, pDacl: Any, bDaclDefaulted: Any
904 ) -> Any:
905 # set information in a discretionary access control list (DACL)
906 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted)
907
908 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor: Any) -> Any:
909 # return a security descriptor in absolute format
910 # by using a security descriptor in self-relative format as a template
911 pAbsoluteSecurityDescriptor = None
912 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0)
913 pDacl = None
914 lpdwDaclSize = wintypes.DWORD(0)
915 pSacl = None
916 lpdwSaclSize = wintypes.DWORD(0)
917 pOwner = None
918 lpdwOwnerSize = wintypes.DWORD(0)
919 pPrimaryGroup = None
920 lpdwPrimaryGroupSize = wintypes.DWORD(0)
921 try:
922 advapi32.MakeAbsoluteSD(
923 pSelfRelativeSecurityDescriptor,
924 pAbsoluteSecurityDescriptor,
925 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
926 pDacl,
927 ctypes.byref(lpdwDaclSize),
928 pSacl,
929 ctypes.byref(lpdwSaclSize),
930 pOwner,
931 ctypes.byref(lpdwOwnerSize),
932 pPrimaryGroup,
933 ctypes.byref(lpdwPrimaryGroupSize),
934 )
935 except OSError as e:
936 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
937 raise
938 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)()
939 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)()
940 pDacl = ctypes.cast(pDaclData, PACL).contents
941 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)()
942 pSacl = ctypes.cast(pSaclData, PACL).contents
943 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)()
944 pOwner = ctypes.cast(pOwnerData, PSID)
945 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)()
946 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID)
947 advapi32.MakeAbsoluteSD(
948 pSelfRelativeSecurityDescriptor,
949 pAbsoluteSecurityDescriptor,
950 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
951 pDacl,
952 ctypes.byref(lpdwDaclSize),
953 pSacl,
954 ctypes.byref(lpdwSaclSize),
955 pOwner,
956 lpdwOwnerSize,
957 pPrimaryGroup,
958 ctypes.byref(lpdwPrimaryGroupSize),
959 )
960 return pAbsoluteSecurityDescriptor
961
962 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor: Any) -> Any:
963 # return a security descriptor in self-relative format
964 # by using a security descriptor in absolute format as a template
965 pSelfRelativeSecurityDescriptor = None
966 lpdwBufferLength = wintypes.DWORD(0)
967 try:
968 advapi32.MakeSelfRelativeSD(
969 pAbsoluteSecurityDescriptor,
970 pSelfRelativeSecurityDescriptor,
971 ctypes.byref(lpdwBufferLength),
972 )
973 except OSError as e:
974 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
975 raise
976 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)()
977 advapi32.MakeSelfRelativeSD(
978 pAbsoluteSecurityDescriptor,
979 pSelfRelativeSecurityDescriptor,
980 ctypes.byref(lpdwBufferLength),
981 )
982 return pSelfRelativeSecurityDescriptor
983
984 def NewAcl() -> Any:
985 # return a new, initialized ACL (access control list) structure
986 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ?
987 acl_data = ctypes.create_string_buffer(nAclLength)
988 pAcl = ctypes.cast(acl_data, PACL).contents
989 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION)
990 return pAcl
991
992 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid)
993 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0]
994
995 Acl = NewAcl()
996 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins)
997 AddAccessAllowedAce(
998 Acl,
999 ACL_REVISION,
1000 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE,
1001 SidUser,
1002 )
1003
1004 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION)
1005 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD)
1006 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0)
1007 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD)
1008
1009 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD)
1010
1011
1012def get_file_mode(fname: str) -> int:
1013 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner.
1014
1015 Parameters
1016 ----------
1017
1018 fname : unicode
1019 The path to the file to get mode from
1020
1021 """
1022 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we
1023 # should tolerate the execute bit on the file's owner when validating permissions - thus
1024 # the missing least significant bit on the third octal digit. In addition, we also tolerate
1025 # the sticky bit being set, so the lsb from the fourth octal digit is also removed.
1026 return (
1027 stat.S_IMODE(Path(fname).stat().st_mode) & 0o6677
1028 ) # Use 4 octal digits since S_IMODE does the same
1029
1030
1031allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1")
1032
1033
1034@contextmanager
1035def secure_write(fname: str, binary: bool = False) -> Iterator[Any]:
1036 """Opens a file in the most restricted pattern available for
1037 writing content. This limits the file mode to `0o0600` and yields
1038 the resulting opened filed handle.
1039
1040 Parameters
1041 ----------
1042
1043 fname : unicode
1044 The path to the file to write
1045
1046 binary: boolean
1047 Indicates that the file is binary
1048 """
1049 mode = "wb" if binary else "w"
1050 encoding = None if binary else "utf-8"
1051 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
1052 try:
1053 Path(fname).unlink()
1054 except OSError:
1055 # Skip any issues with the file not existing
1056 pass
1057
1058 if os.name == "nt":
1059 if allow_insecure_writes:
1060 # Mounted file systems can have a number of failure modes inside this block.
1061 # For windows machines in insecure mode we simply skip this to avoid failures :/
1062 issue_insecure_write_warning()
1063 else:
1064 # Python on windows does not respect the group and public bits for chmod, so we need
1065 # to take additional steps to secure the contents.
1066 # Touch file preemptively to avoid editing permissions in open files in Windows
1067 fd = os.open(fname, open_flag, 0o0600)
1068 os.close(fd)
1069 open_flag = os.O_WRONLY | os.O_TRUNC
1070 win32_restrict_file_to_user(fname)
1071
1072 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f:
1073 if os.name != "nt":
1074 # Enforce that the file got the requested permissions before writing
1075 file_mode = get_file_mode(fname)
1076 if file_mode != 0o0600:
1077 if allow_insecure_writes:
1078 issue_insecure_write_warning()
1079 else:
1080 msg = (
1081 f"Permissions assignment failed for secure file: '{fname}'."
1082 f" Got '{oct(file_mode)}' instead of '0o0600'."
1083 )
1084 raise RuntimeError(msg)
1085 yield f
1086
1087
1088def issue_insecure_write_warning() -> None:
1089 """Issue an insecure write warning."""
1090
1091 def format_warning(msg: str, *args: Any, **kwargs: Any) -> str: # noqa: ARG001
1092 return str(msg) + "\n"
1093
1094 warnings.formatwarning = format_warning # type:ignore[assignment]
1095 warnings.warn(
1096 "WARNING: Insecure writes have been enabled via environment variable "
1097 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the "
1098 "variable or set its value to 'False'.",
1099 stacklevel=2,
1100 )