1"""Path utility functions."""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6# Derived from IPython.utils.path, which is
7# Copyright (c) IPython Development Team.
8# Distributed under the terms of the Modified BSD License.
9from __future__ import annotations
10
11import errno
12import os
13import site
14import stat
15import sys
16import tempfile
17import warnings
18from contextlib import contextmanager
19from pathlib import Path
20from typing import Any, Iterator, Optional
21
22import platformdirs
23
24from .utils import deprecation
25
26pjoin = os.path.join
27
28# Capitalize Jupyter in paths only on Windows and MacOS (when not in Homebrew)
29if sys.platform == "win32" or (
30 sys.platform == "darwin" and not sys.prefix.startswith("/opt/homebrew")
31):
32 APPNAME = "Jupyter"
33else:
34 APPNAME = "jupyter"
35
36# UF_HIDDEN is a stat flag not defined in the stat module.
37# It is used by BSD to indicate hidden files.
38UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768)
39
40
41def envset(name: str, default: Optional[bool] = False) -> Optional[bool]:
42 """Return the boolean value of a given environment variable.
43
44 An environment variable is considered set if it is assigned to a value
45 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive)
46
47 If the environment variable is not defined, the default value is returned.
48 """
49 if name not in os.environ:
50 return default
51
52 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"]
53
54
55def use_platform_dirs() -> bool:
56 """Determine if platformdirs should be used for system-specific paths.
57
58 We plan for this to default to False in jupyter_core version 5 and to True
59 in jupyter_core version 6.
60 """
61 return envset("JUPYTER_PLATFORM_DIRS", False) # type:ignore[return-value]
62
63
64def get_home_dir() -> str:
65 """Get the real path of the home directory"""
66 homedir = Path("~").expanduser()
67 # Next line will make things work even when /home/ is a symlink to
68 # /usr/home as it is on FreeBSD, for example
69 return str(Path(homedir).resolve())
70
71
72_dtemps: dict[str, str] = {}
73
74
75def _do_i_own(path: str) -> bool:
76 """Return whether the current user owns the given path"""
77 p = Path(path).resolve()
78
79 # walk up to first existing parent
80 while not p.exists() and p != p.parent:
81 p = p.parent
82
83 # simplest check: owner by name
84 # not always implemented or available
85 try:
86 return p.owner() == os.getlogin()
87 except Exception: # noqa: S110
88 pass
89
90 if hasattr(os, "geteuid"):
91 try:
92 st = p.stat()
93 return st.st_uid == os.geteuid()
94 except (NotImplementedError, OSError):
95 # geteuid not always implemented
96 pass
97
98 # no ownership checks worked, check write access
99 return os.access(p, os.W_OK)
100
101
102def prefer_environment_over_user() -> bool:
103 """Determine if environment-level paths should take precedence over user-level paths."""
104 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value
105 if "JUPYTER_PREFER_ENV_PATH" in os.environ:
106 return envset("JUPYTER_PREFER_ENV_PATH") # type:ignore[return-value]
107
108 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def)
109 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix):
110 return True
111
112 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True
113 if (
114 "CONDA_PREFIX" in os.environ
115 and sys.prefix.startswith(os.environ["CONDA_PREFIX"])
116 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base"
117 and _do_i_own(sys.prefix)
118 ):
119 return True
120
121 return False
122
123
124def _mkdtemp_once(name: str) -> str:
125 """Make or reuse a temporary directory.
126
127 If this is called with the same name in the same process, it will return
128 the same directory.
129 """
130 try:
131 return _dtemps[name]
132 except KeyError:
133 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-")
134 return d
135
136
137def jupyter_config_dir() -> str:
138 """Get the Jupyter config directory for this platform and user.
139
140 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate
141 directory for the platform.
142 """
143
144 env = os.environ
145 if env.get("JUPYTER_NO_CONFIG"):
146 return _mkdtemp_once("jupyter-clean-cfg")
147
148 if env.get("JUPYTER_CONFIG_DIR"):
149 return env["JUPYTER_CONFIG_DIR"]
150
151 if use_platform_dirs():
152 return platformdirs.user_config_dir(APPNAME, appauthor=False)
153
154 home_dir = get_home_dir()
155 return pjoin(home_dir, ".jupyter")
156
157
158def jupyter_data_dir() -> str:
159 """Get the config directory for Jupyter data files for this platform and user.
160
161 These are non-transient, non-configuration files.
162
163 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path.
164 """
165 env = os.environ
166
167 if env.get("JUPYTER_DATA_DIR"):
168 return env["JUPYTER_DATA_DIR"]
169
170 if use_platform_dirs():
171 return platformdirs.user_data_dir(APPNAME, appauthor=False)
172
173 home = get_home_dir()
174
175 if sys.platform == "darwin":
176 return str(Path(home, "Library", "Jupyter"))
177 if sys.platform == "win32":
178 appdata = os.environ.get("APPDATA", None)
179 if appdata:
180 return str(Path(appdata, "jupyter").resolve())
181 return pjoin(jupyter_config_dir(), "data")
182 # Linux, non-OS X Unix, AIX, etc.
183 xdg = env.get("XDG_DATA_HOME", None)
184 if not xdg:
185 xdg = pjoin(home, ".local", "share")
186 return pjoin(xdg, "jupyter")
187
188
189def jupyter_runtime_dir() -> str:
190 """Return the runtime dir for transient jupyter files.
191
192 Returns JUPYTER_RUNTIME_DIR if defined.
193
194 The default is now (data_dir)/runtime on all platforms;
195 we no longer use XDG_RUNTIME_DIR after various problems.
196 """
197 env = os.environ
198
199 if env.get("JUPYTER_RUNTIME_DIR"):
200 return env["JUPYTER_RUNTIME_DIR"]
201
202 return pjoin(jupyter_data_dir(), "runtime")
203
204
205if use_platform_dirs():
206 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir(
207 APPNAME, appauthor=False, multipath=True
208 ).split(os.pathsep)
209else:
210 deprecation(
211 "Jupyter is migrating its paths to use standard platformdirs\n"
212 "given by the platformdirs library. To remove this warning and\n"
213 "see the appropriate new directories, set the environment variable\n"
214 "`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.\n"
215 "The use of platformdirs will be the default in `jupyter_core` v6"
216 )
217 if os.name == "nt":
218 programdata = os.environ.get("PROGRAMDATA", None)
219 if programdata:
220 SYSTEM_JUPYTER_PATH = [pjoin(programdata, "jupyter")]
221 else: # PROGRAMDATA is not defined by default on XP.
222 SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))]
223 else:
224 SYSTEM_JUPYTER_PATH = [
225 "/usr/local/share/jupyter",
226 "/usr/share/jupyter",
227 ]
228
229ENV_JUPYTER_PATH: list[str] = [str(Path(sys.prefix, "share", "jupyter"))]
230
231
232def jupyter_path(*subdirs: str) -> list[str]:
233 """Return a list of directories to search for data files
234
235 JUPYTER_PATH environment variable has highest priority.
236
237 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level
238 directories will have priority over user-level directories.
239
240 If the Python site.ENABLE_USER_SITE variable is True, we also add the
241 appropriate Python user site subdirectory to the user-level directories.
242
243
244 If ``*subdirs`` are given, that subdirectory will be added to each element.
245
246 Examples:
247
248 >>> jupyter_path()
249 ['~/.local/jupyter', '/usr/local/share/jupyter']
250 >>> jupyter_path('kernels')
251 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels']
252 """
253
254 paths: list[str] = []
255
256 # highest priority is explicit environment variable
257 if os.environ.get("JUPYTER_PATH"):
258 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep))
259
260 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
261 user = [jupyter_data_dir()]
262 if site.ENABLE_USER_SITE:
263 # Check if site.getuserbase() exists to be compatible with virtualenv,
264 # which often does not have this method.
265 userbase: Optional[str]
266 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
267
268 if userbase:
269 userdir = str(Path(userbase, "share", "jupyter"))
270 if userdir not in user:
271 user.append(userdir)
272
273 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH]
274
275 if prefer_environment_over_user():
276 paths.extend(env)
277 paths.extend(user)
278 else:
279 paths.extend(user)
280 paths.extend(env)
281
282 # finally, system
283 paths.extend(SYSTEM_JUPYTER_PATH)
284
285 # add subdir, if requested
286 if subdirs:
287 paths = [pjoin(p, *subdirs) for p in paths]
288 return paths
289
290
291if use_platform_dirs():
292 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir(
293 APPNAME, appauthor=False, multipath=True
294 ).split(os.pathsep)
295else:
296 if os.name == "nt":
297 programdata = os.environ.get("PROGRAMDATA", None)
298 if programdata: # noqa: SIM108
299 SYSTEM_CONFIG_PATH = [str(Path(programdata, "jupyter"))]
300 else: # PROGRAMDATA is not defined by default on XP.
301 SYSTEM_CONFIG_PATH = []
302 else:
303 SYSTEM_CONFIG_PATH = [
304 "/usr/local/etc/jupyter",
305 "/etc/jupyter",
306 ]
307ENV_CONFIG_PATH: list[str] = [str(Path(sys.prefix, "etc", "jupyter"))]
308
309
310def jupyter_config_path() -> list[str]:
311 """Return the search path for Jupyter config files as a list.
312
313 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the
314 environment-level directories will have priority over user-level
315 directories.
316
317 If the Python site.ENABLE_USER_SITE variable is True, we also add the
318 appropriate Python user site subdirectory to the user-level directories.
319 """
320 if os.environ.get("JUPYTER_NO_CONFIG"):
321 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set.
322 return [jupyter_config_dir()]
323
324 paths: list[str] = []
325
326 # highest priority is explicit environment variable
327 if os.environ.get("JUPYTER_CONFIG_PATH"):
328 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep))
329
330 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag
331 user = [jupyter_config_dir()]
332 if site.ENABLE_USER_SITE:
333 userbase: Optional[str]
334 # Check if site.getuserbase() exists to be compatible with virtualenv,
335 # which often does not have this method.
336 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE
337
338 if userbase:
339 userdir = str(Path(userbase, "etc", "jupyter"))
340 if userdir not in user:
341 user.append(userdir)
342
343 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH]
344
345 if prefer_environment_over_user():
346 paths.extend(env)
347 paths.extend(user)
348 else:
349 paths.extend(user)
350 paths.extend(env)
351
352 # Finally, system path
353 paths.extend(SYSTEM_CONFIG_PATH)
354 return paths
355
356
357def exists(path: str) -> bool:
358 """Replacement for `os.path.exists` which works for host mapped volumes
359 on Windows containers
360 """
361 try:
362 os.lstat(path)
363 except OSError:
364 return False
365 return True
366
367
368def is_file_hidden_win(abs_path: str, stat_res: Optional[Any] = None) -> bool:
369 """Is a file hidden?
370
371 This only checks the file itself; it should be called in combination with
372 checking the directory containing the file.
373
374 Use is_hidden() instead to check the file and its parent directories.
375
376 Parameters
377 ----------
378 abs_path : unicode
379 The absolute path to check.
380 stat_res : os.stat_result, optional
381 The result of calling stat() on abs_path. If not passed, this function
382 will call stat() internally.
383 """
384 if Path(abs_path).name.startswith("."):
385 return True
386
387 if stat_res is None:
388 try:
389 stat_res = Path(abs_path).stat()
390 except OSError as e:
391 if e.errno == errno.ENOENT:
392 return False
393 raise
394
395 try:
396 if (
397 stat_res.st_file_attributes # type:ignore[union-attr]
398 & stat.FILE_ATTRIBUTE_HIDDEN # type:ignore[attr-defined]
399 ):
400 return True
401 except AttributeError:
402 # allow AttributeError on PyPy for Windows
403 # 'stat_result' object has no attribute 'st_file_attributes'
404 # https://foss.heptapod.net/pypy/pypy/-/issues/3469
405 warnings.warn(
406 "hidden files are not detectable on this system, so no file will be marked as hidden.",
407 stacklevel=2,
408 )
409
410 return False
411
412
413def is_file_hidden_posix(abs_path: str, stat_res: Optional[Any] = None) -> bool:
414 """Is a file hidden?
415
416 This only checks the file itself; it should be called in combination with
417 checking the directory containing the file.
418
419 Use is_hidden() instead to check the file and its parent directories.
420
421 Parameters
422 ----------
423 abs_path : unicode
424 The absolute path to check.
425 stat_res : os.stat_result, optional
426 The result of calling stat() on abs_path. If not passed, this function
427 will call stat() internally.
428 """
429 if Path(abs_path).name.startswith("."):
430 return True
431
432 if stat_res is None or stat.S_ISLNK(stat_res.st_mode):
433 try:
434 stat_res = Path(abs_path).stat()
435 except OSError as e:
436 if e.errno == errno.ENOENT:
437 return False
438 raise
439
440 # check that dirs can be listed
441 if stat.S_ISDIR(stat_res.st_mode): # noqa: SIM102
442 # use x-access, not actual listing, in case of slow/large listings
443 if not os.access(abs_path, os.X_OK | os.R_OK):
444 return True
445
446 # check UF_HIDDEN
447 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN:
448 return True
449
450 return False
451
452
453if sys.platform == "win32":
454 is_file_hidden = is_file_hidden_win
455else:
456 is_file_hidden = is_file_hidden_posix
457
458
459def is_hidden(abs_path: str, abs_root: str = "") -> bool:
460 """Is a file hidden or contained in a hidden directory?
461
462 This will start with the rightmost path element and work backwards to the
463 given root to see if a path is hidden or in a hidden directory. Hidden is
464 determined by either name starting with '.' or the UF_HIDDEN flag as
465 reported by stat.
466
467 If abs_path is the same directory as abs_root, it will be visible even if
468 that is a hidden folder. This only checks the visibility of files
469 and directories *within* abs_root.
470
471 Parameters
472 ----------
473 abs_path : unicode
474 The absolute path to check for hidden directories.
475 abs_root : unicode
476 The absolute path of the root directory in which hidden directories
477 should be checked for.
478 """
479 abs_path = os.path.normpath(abs_path)
480 abs_root = os.path.normpath(abs_root)
481
482 if abs_path == abs_root:
483 return False
484
485 if is_file_hidden(abs_path):
486 return True
487
488 if not abs_root:
489 abs_root = abs_path.split(os.sep, 1)[0] + os.sep
490 inside_root = abs_path[len(abs_root) :]
491 if any(part.startswith(".") for part in Path(inside_root).parts):
492 return True
493
494 # check UF_HIDDEN on any location up to root.
495 # is_file_hidden() already checked the file, so start from its parent dir
496 path = str(Path(abs_path).parent)
497 while path and path.startswith(abs_root) and path != abs_root:
498 if not Path(path).exists():
499 path = str(Path(path).parent)
500 continue
501 try:
502 # may fail on Windows junctions
503 st = os.lstat(path)
504 except OSError:
505 return True
506 if getattr(st, "st_flags", 0) & UF_HIDDEN:
507 return True
508 path = str(Path(path).parent)
509
510 return False
511
512
513def win32_restrict_file_to_user(fname: str) -> None:
514 """Secure a windows file to read-only access for the user.
515 Follows guidance from win32 library creator:
516 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
517
518 This method should be executed against an already generated file which
519 has no secrets written to it yet.
520
521 Parameters
522 ----------
523
524 fname : unicode
525 The path to the file to secure
526 """
527 try:
528 import win32api
529 except ImportError:
530 return _win32_restrict_file_to_user_ctypes(fname)
531
532 import ntsecuritycon as con
533 import win32security
534
535 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone")
536 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid)
537 user, _domain, _type = win32security.LookupAccountName(
538 "", win32api.GetUserNameEx(win32api.NameSamCompatible)
539 )
540
541 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION)
542
543 dacl = win32security.ACL()
544 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone)
545 dacl.AddAccessAllowedAce(
546 win32security.ACL_REVISION,
547 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE,
548 user,
549 )
550 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins)
551
552 sd.SetSecurityDescriptorDacl(1, dacl, 0)
553 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd)
554 return None
555
556
557def _win32_restrict_file_to_user_ctypes(fname: str) -> None:
558 """Secure a windows file to read-only access for the user.
559
560 Follows guidance from win32 library creator:
561 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html
562
563 This method should be executed against an already generated file which
564 has no secrets written to it yet.
565
566 Parameters
567 ----------
568
569 fname : unicode
570 The path to the file to secure
571 """
572 import ctypes
573 from ctypes import wintypes
574
575 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined]
576 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined]
577
578 NameSamCompatible = 2
579 WinBuiltinAdministratorsSid = 26
580 DACL_SECURITY_INFORMATION = 4
581 ACL_REVISION = 2
582 ERROR_INSUFFICIENT_BUFFER = 122
583 ERROR_MORE_DATA = 234
584
585 SYNCHRONIZE = 0x100000
586 DELETE = 0x00010000
587 STANDARD_RIGHTS_REQUIRED = 0xF0000
588 STANDARD_RIGHTS_READ = 0x20000
589 STANDARD_RIGHTS_WRITE = 0x20000
590 FILE_READ_DATA = 1
591 FILE_READ_EA = 8
592 FILE_READ_ATTRIBUTES = 128
593 FILE_WRITE_DATA = 2
594 FILE_APPEND_DATA = 4
595 FILE_WRITE_EA = 16
596 FILE_WRITE_ATTRIBUTES = 256
597 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF
598 FILE_GENERIC_READ = (
599 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE
600 )
601 FILE_GENERIC_WRITE = (
602 STANDARD_RIGHTS_WRITE
603 | FILE_WRITE_DATA
604 | FILE_WRITE_ATTRIBUTES
605 | FILE_WRITE_EA
606 | FILE_APPEND_DATA
607 | SYNCHRONIZE
608 )
609
610 class ACL(ctypes.Structure):
611 _fields_ = [
612 ("AclRevision", wintypes.BYTE),
613 ("Sbz1", wintypes.BYTE),
614 ("AclSize", wintypes.WORD),
615 ("AceCount", wintypes.WORD),
616 ("Sbz2", wintypes.WORD),
617 ]
618
619 PSID = ctypes.c_void_p
620 PACL = ctypes.POINTER(ACL)
621 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE)
622
623 def _nonzero_success(result: int, func: Any, args: Any) -> Any: # noqa: ARG001
624 if not result:
625 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined]
626 return args
627
628 secur32.GetUserNameExW.errcheck = _nonzero_success
629 secur32.GetUserNameExW.restype = wintypes.BOOL
630 secur32.GetUserNameExW.argtypes = (
631 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat
632 wintypes.LPWSTR, # LPWSTR lpNameBuffer,
633 wintypes.PULONG, # PULONG nSize
634 )
635
636 advapi32.CreateWellKnownSid.errcheck = _nonzero_success
637 advapi32.CreateWellKnownSid.restype = wintypes.BOOL
638 advapi32.CreateWellKnownSid.argtypes = (
639 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType
640 PSID, # PSID DomainSid
641 PSID, # PSID pSid
642 wintypes.PDWORD, # DWORD *cbSid
643 )
644
645 advapi32.LookupAccountNameW.errcheck = _nonzero_success
646 advapi32.LookupAccountNameW.restype = wintypes.BOOL
647 advapi32.LookupAccountNameW.argtypes = (
648 wintypes.LPWSTR, # LPCWSTR lpSystemName
649 wintypes.LPWSTR, # LPCWSTR lpAccountName
650 PSID, # PSID Sid
651 wintypes.LPDWORD, # LPDWORD cbSid
652 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName
653 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName
654 wintypes.LPDWORD, # PSID_NAME_USE peUse
655 )
656
657 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success
658 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL
659 advapi32.AddAccessAllowedAce.argtypes = (
660 PACL, # PACL pAcl
661 wintypes.DWORD, # DWORD dwAceRevision
662 wintypes.DWORD, # DWORD AccessMask
663 PSID, # PSID pSid
664 )
665
666 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success
667 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL
668 advapi32.SetSecurityDescriptorDacl.argtypes = (
669 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
670 wintypes.BOOL, # BOOL bDaclPresent
671 PACL, # PACL pDacl
672 wintypes.BOOL, # BOOL bDaclDefaulted
673 )
674
675 advapi32.GetFileSecurityW.errcheck = _nonzero_success
676 advapi32.GetFileSecurityW.restype = wintypes.BOOL
677 advapi32.GetFileSecurityW.argtypes = (
678 wintypes.LPCWSTR, # LPCWSTR lpFileName
679 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation
680 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
681 wintypes.DWORD, # DWORD nLength
682 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded
683 )
684
685 advapi32.SetFileSecurityW.errcheck = _nonzero_success
686 advapi32.SetFileSecurityW.restype = wintypes.BOOL
687 advapi32.SetFileSecurityW.argtypes = (
688 wintypes.LPCWSTR, # LPCWSTR lpFileName
689 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation
690 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor
691 )
692
693 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success
694 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL
695 advapi32.MakeAbsoluteSD.argtypes = (
696 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
697 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
698 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize
699 PACL, # PACL pDacl
700 wintypes.LPDWORD, # LPDWORD lpdwDaclSize
701 PACL, # PACL pSacl
702 wintypes.LPDWORD, # LPDWORD lpdwSaclSize
703 PSID, # PSID pOwner
704 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize
705 PSID, # PSID pPrimaryGroup
706 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize
707 )
708
709 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success
710 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL
711 advapi32.MakeSelfRelativeSD.argtypes = (
712 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor
713 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor
714 wintypes.LPDWORD, # LPDWORD lpdwBufferLength
715 )
716
717 advapi32.InitializeAcl.errcheck = _nonzero_success
718 advapi32.InitializeAcl.restype = wintypes.BOOL
719 advapi32.InitializeAcl.argtypes = (
720 PACL, # PACL pAcl,
721 wintypes.DWORD, # DWORD nAclLength,
722 wintypes.DWORD, # DWORD dwAclRevision
723 )
724
725 def CreateWellKnownSid(WellKnownSidType: Any) -> Any:
726 # return a SID for predefined aliases
727 pSid = (ctypes.c_char * 1)()
728 cbSid = wintypes.DWORD()
729 try:
730 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
731 except OSError as e:
732 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
733 raise
734 pSid = (ctypes.c_char * cbSid.value)()
735 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid))
736 return pSid[:]
737
738 def GetUserNameEx(NameFormat: Any) -> Any:
739 # return the user or other security principal associated with
740 # the calling thread
741 nSize = ctypes.pointer(ctypes.c_ulong(0))
742 try:
743 secur32.GetUserNameExW(NameFormat, None, nSize)
744 except OSError as e:
745 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined]
746 raise
747 if not nSize.contents.value:
748 return None
749 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value)
750 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize)
751 return lpNameBuffer.value
752
753 def LookupAccountName(lpSystemName: Any, lpAccountName: Any) -> Any:
754 # return a security identifier (SID) for an account on a system
755 # and the name of the domain on which the account was found
756 cbSid = wintypes.DWORD(0)
757 cchReferencedDomainName = wintypes.DWORD(0)
758 peUse = wintypes.DWORD(0)
759 try:
760 advapi32.LookupAccountNameW(
761 lpSystemName,
762 lpAccountName,
763 None,
764 ctypes.byref(cbSid),
765 None,
766 ctypes.byref(cchReferencedDomainName),
767 ctypes.byref(peUse),
768 )
769 except OSError as e:
770 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
771 raise
772 Sid = ctypes.create_unicode_buffer("", cbSid.value)
773 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID)
774 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1)
775 success = advapi32.LookupAccountNameW(
776 lpSystemName,
777 lpAccountName,
778 pSid,
779 ctypes.byref(cbSid),
780 lpReferencedDomainName,
781 ctypes.byref(cchReferencedDomainName),
782 ctypes.byref(peUse),
783 )
784 if not success:
785 raise ctypes.WinError() # type:ignore[attr-defined]
786 return pSid, lpReferencedDomainName.value, peUse.value
787
788 def AddAccessAllowedAce(pAcl: Any, dwAceRevision: Any, AccessMask: Any, pSid: Any) -> Any:
789 # add an access-allowed access control entry (ACE)
790 # to an access control list (ACL)
791 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid)
792
793 def GetFileSecurity(lpFileName: Any, RequestedInformation: Any) -> Any:
794 # return information about the security of a file or directory
795 nLength = wintypes.DWORD(0)
796 try:
797 advapi32.GetFileSecurityW(
798 lpFileName,
799 RequestedInformation,
800 None,
801 0,
802 ctypes.byref(nLength),
803 )
804 except OSError as e:
805 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
806 raise
807 if not nLength.value:
808 return None
809 pSecurityDescriptor = (wintypes.BYTE * nLength.value)()
810 advapi32.GetFileSecurityW(
811 lpFileName,
812 RequestedInformation,
813 pSecurityDescriptor,
814 nLength,
815 ctypes.byref(nLength),
816 )
817 return pSecurityDescriptor
818
819 def SetFileSecurity(
820 lpFileName: Any, RequestedInformation: Any, pSecurityDescriptor: Any
821 ) -> Any:
822 # set the security of a file or directory object
823 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor)
824
825 def SetSecurityDescriptorDacl(
826 pSecurityDescriptor: Any, bDaclPresent: Any, pDacl: Any, bDaclDefaulted: Any
827 ) -> Any:
828 # set information in a discretionary access control list (DACL)
829 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted)
830
831 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor: Any) -> Any:
832 # return a security descriptor in absolute format
833 # by using a security descriptor in self-relative format as a template
834 pAbsoluteSecurityDescriptor = None
835 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0)
836 pDacl = None
837 lpdwDaclSize = wintypes.DWORD(0)
838 pSacl = None
839 lpdwSaclSize = wintypes.DWORD(0)
840 pOwner = None
841 lpdwOwnerSize = wintypes.DWORD(0)
842 pPrimaryGroup = None
843 lpdwPrimaryGroupSize = wintypes.DWORD(0)
844 try:
845 advapi32.MakeAbsoluteSD(
846 pSelfRelativeSecurityDescriptor,
847 pAbsoluteSecurityDescriptor,
848 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
849 pDacl,
850 ctypes.byref(lpdwDaclSize),
851 pSacl,
852 ctypes.byref(lpdwSaclSize),
853 pOwner,
854 ctypes.byref(lpdwOwnerSize),
855 pPrimaryGroup,
856 ctypes.byref(lpdwPrimaryGroupSize),
857 )
858 except OSError as e:
859 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
860 raise
861 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)()
862 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)()
863 pDacl = ctypes.cast(pDaclData, PACL).contents
864 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)()
865 pSacl = ctypes.cast(pSaclData, PACL).contents
866 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)()
867 pOwner = ctypes.cast(pOwnerData, PSID)
868 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)()
869 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID)
870 advapi32.MakeAbsoluteSD(
871 pSelfRelativeSecurityDescriptor,
872 pAbsoluteSecurityDescriptor,
873 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize),
874 pDacl,
875 ctypes.byref(lpdwDaclSize),
876 pSacl,
877 ctypes.byref(lpdwSaclSize),
878 pOwner,
879 lpdwOwnerSize,
880 pPrimaryGroup,
881 ctypes.byref(lpdwPrimaryGroupSize),
882 )
883 return pAbsoluteSecurityDescriptor
884
885 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor: Any) -> Any:
886 # return a security descriptor in self-relative format
887 # by using a security descriptor in absolute format as a template
888 pSelfRelativeSecurityDescriptor = None
889 lpdwBufferLength = wintypes.DWORD(0)
890 try:
891 advapi32.MakeSelfRelativeSD(
892 pAbsoluteSecurityDescriptor,
893 pSelfRelativeSecurityDescriptor,
894 ctypes.byref(lpdwBufferLength),
895 )
896 except OSError as e:
897 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined]
898 raise
899 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)()
900 advapi32.MakeSelfRelativeSD(
901 pAbsoluteSecurityDescriptor,
902 pSelfRelativeSecurityDescriptor,
903 ctypes.byref(lpdwBufferLength),
904 )
905 return pSelfRelativeSecurityDescriptor
906
907 def NewAcl() -> Any:
908 # return a new, initialized ACL (access control list) structure
909 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ?
910 acl_data = ctypes.create_string_buffer(nAclLength)
911 pAcl = ctypes.cast(acl_data, PACL).contents
912 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION)
913 return pAcl
914
915 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid)
916 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0]
917
918 Acl = NewAcl()
919 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins)
920 AddAccessAllowedAce(
921 Acl,
922 ACL_REVISION,
923 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE,
924 SidUser,
925 )
926
927 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION)
928 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD)
929 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0)
930 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD)
931
932 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD)
933
934
935def get_file_mode(fname: str) -> int:
936 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner.
937
938 Parameters
939 ----------
940
941 fname : unicode
942 The path to the file to get mode from
943
944 """
945 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we
946 # should tolerate the execute bit on the file's owner when validating permissions - thus
947 # the missing least significant bit on the third octal digit. In addition, we also tolerate
948 # the sticky bit being set, so the lsb from the fourth octal digit is also removed.
949 return (
950 stat.S_IMODE(Path(fname).stat().st_mode) & 0o6677
951 ) # Use 4 octal digits since S_IMODE does the same
952
953
954allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1")
955
956
957@contextmanager
958def secure_write(fname: str, binary: bool = False) -> Iterator[Any]:
959 """Opens a file in the most restricted pattern available for
960 writing content. This limits the file mode to `0o0600` and yields
961 the resulting opened filed handle.
962
963 Parameters
964 ----------
965
966 fname : unicode
967 The path to the file to write
968
969 binary: boolean
970 Indicates that the file is binary
971 """
972 mode = "wb" if binary else "w"
973 encoding = None if binary else "utf-8"
974 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
975 try:
976 Path(fname).unlink()
977 except OSError:
978 # Skip any issues with the file not existing
979 pass
980
981 if os.name == "nt":
982 if allow_insecure_writes:
983 # Mounted file systems can have a number of failure modes inside this block.
984 # For windows machines in insecure mode we simply skip this to avoid failures :/
985 issue_insecure_write_warning()
986 else:
987 # Python on windows does not respect the group and public bits for chmod, so we need
988 # to take additional steps to secure the contents.
989 # Touch file pre-emptively to avoid editing permissions in open files in Windows
990 fd = os.open(fname, open_flag, 0o0600)
991 os.close(fd)
992 open_flag = os.O_WRONLY | os.O_TRUNC
993 win32_restrict_file_to_user(fname)
994
995 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f:
996 if os.name != "nt":
997 # Enforce that the file got the requested permissions before writing
998 file_mode = get_file_mode(fname)
999 if file_mode != 0o0600:
1000 if allow_insecure_writes:
1001 issue_insecure_write_warning()
1002 else:
1003 msg = (
1004 f"Permissions assignment failed for secure file: '{fname}'."
1005 f" Got '{oct(file_mode)}' instead of '0o0600'."
1006 )
1007 raise RuntimeError(msg)
1008 yield f
1009
1010
1011def issue_insecure_write_warning() -> None:
1012 """Issue an insecure write warning."""
1013
1014 def format_warning(msg: str, *args: Any, **kwargs: Any) -> str: # noqa: ARG001
1015 return str(msg) + "\n"
1016
1017 warnings.formatwarning = format_warning # type:ignore[assignment]
1018 warnings.warn(
1019 "WARNING: Insecure writes have been enabled via environment variable "
1020 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the "
1021 "variable or set its value to 'False'.",
1022 stacklevel=2,
1023 )