Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_core/paths.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

469 statements  

1"""Path utility functions.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6# Derived from IPython.utils.path, which is 

7# Copyright (c) IPython Development Team. 

8# Distributed under the terms of the Modified BSD License. 

9from __future__ import annotations 

10 

11import errno 

12import os 

13import site 

14import stat 

15import sys 

16import tempfile 

17import warnings 

18from collections.abc import Iterator 

19from contextlib import contextmanager 

20from pathlib import Path 

21from typing import Any, overload 

22 

23import platformdirs 

24 

25pjoin = os.path.join 

26 

27# Capitalize Jupyter in paths only on Windows and MacOS (when not in Homebrew) 

28if sys.platform == "win32" or ( 

29 sys.platform == "darwin" and not sys.prefix.startswith("/opt/homebrew") 

30): 

31 APPNAME = "Jupyter" 

32else: 

33 APPNAME = "jupyter" 

34 

35# UF_HIDDEN is a stat flag not defined in the stat module. 

36# It is used by BSD to indicate hidden files. 

37UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768) 

38 

39 

40@overload 

41def envset(name: str, default: bool = False) -> bool: ... 

42 

43 

44@overload 

45def envset(name: str, default: None) -> bool | None: ... 

46 

47 

48def envset(name: str, default: bool | None = False) -> bool | None: 

49 """Return the boolean value of a given environment variable. 

50 

51 An environment variable is considered set if it is assigned to a value 

52 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive) 

53 

54 If the environment variable is not defined, the default value is returned. 

55 """ 

56 if name not in os.environ: 

57 return default 

58 

59 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"] 

60 

61 

62def use_platform_dirs() -> bool: 

63 """Determine if platformdirs should be used for system-specific paths. 

64 

65 The default is False. 

66 """ 

67 return envset("JUPYTER_PLATFORM_DIRS", False) 

68 

69 

70def get_home_dir() -> str: 

71 """Get the real path of the home directory""" 

72 homedir = Path("~").expanduser() 

73 # Next line will make things work even when /home/ is a symlink to 

74 # /usr/home as it is on FreeBSD, for example 

75 return str(Path(homedir).resolve()) 

76 

77 

78_dtemps: dict[str, str] = {} 

79 

80 

81def _do_i_own(path: str) -> bool: 

82 """Return whether the current user owns the given path""" 

83 p = Path(path).resolve() 

84 

85 # walk up to first existing parent 

86 while not p.exists() and p != p.parent: 

87 p = p.parent 

88 

89 # simplest check: owner by name 

90 # not always implemented or available 

91 try: 

92 return p.owner() == os.getlogin() 

93 except Exception: # noqa: S110 

94 pass 

95 

96 if hasattr(os, "geteuid"): 

97 try: 

98 st = p.stat() 

99 return st.st_uid == os.geteuid() 

100 except (NotImplementedError, OSError): 

101 # geteuid not always implemented 

102 pass 

103 

104 # no ownership checks worked, check write access 

105 return os.access(p, os.W_OK) 

106 

107 

108def prefer_environment_over_user() -> bool: 

109 """Determine if environment-level paths should take precedence over user-level paths.""" 

110 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value 

111 if "JUPYTER_PREFER_ENV_PATH" in os.environ: 

112 return envset("JUPYTER_PREFER_ENV_PATH") 

113 

114 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def) 

115 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix): 

116 return True 

117 

118 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True 

119 if ( 

120 "CONDA_PREFIX" in os.environ 

121 and sys.prefix.startswith(os.environ["CONDA_PREFIX"]) 

122 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base" 

123 and _do_i_own(sys.prefix) 

124 ): 

125 return True 

126 

127 return False 

128 

129 

130def _mkdtemp_once(name: str) -> str: 

131 """Make or reuse a temporary directory. 

132 

133 If this is called with the same name in the same process, it will return 

134 the same directory. 

135 """ 

136 try: 

137 return _dtemps[name] 

138 except KeyError: 

139 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-") 

140 return d 

141 

142 

143def jupyter_config_dir() -> str: 

144 """Get the Jupyter config directory for this platform and user. 

145 

146 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate 

147 directory for the platform. 

148 """ 

149 

150 env = os.environ 

151 if env.get("JUPYTER_NO_CONFIG"): 

152 return _mkdtemp_once("jupyter-clean-cfg") 

153 

154 if env.get("JUPYTER_CONFIG_DIR"): 

155 return env["JUPYTER_CONFIG_DIR"] 

156 

157 if use_platform_dirs(): 

158 return platformdirs.user_config_dir(APPNAME, appauthor=False) 

159 

160 home_dir = get_home_dir() 

161 return pjoin(home_dir, ".jupyter") 

162 

163 

164def jupyter_data_dir() -> str: 

165 """Get the config directory for Jupyter data files for this platform and user. 

166 

167 These are non-transient, non-configuration files. 

168 

169 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path. 

170 """ 

171 env = os.environ 

172 

173 if env.get("JUPYTER_DATA_DIR"): 

174 return env["JUPYTER_DATA_DIR"] 

175 

176 if use_platform_dirs(): 

177 return platformdirs.user_data_dir(APPNAME, appauthor=False) 

178 

179 home = get_home_dir() 

180 

181 if sys.platform == "darwin": 

182 return str(Path(home, "Library", "Jupyter")) 

183 # Bug in mypy which thinks it's unreachable: https://github.com/python/mypy/issues/10773 

184 if sys.platform == "win32": 

185 appdata = os.environ.get("APPDATA", None) 

186 if appdata: 

187 return str(Path(appdata, "jupyter").resolve()) 

188 return pjoin(jupyter_config_dir(), "data") 

189 # Linux, non-OS X Unix, AIX, etc. 

190 # Bug in mypy which thinks it's unreachable: https://github.com/python/mypy/issues/10773 

191 xdg = env.get("XDG_DATA_HOME", None) 

192 if not xdg: 

193 xdg = pjoin(home, ".local", "share") 

194 return pjoin(xdg, "jupyter") 

195 

196 

197def jupyter_runtime_dir() -> str: 

198 """Return the runtime dir for transient jupyter files. 

199 

200 Returns JUPYTER_RUNTIME_DIR if defined. 

201 

202 The default is now (data_dir)/runtime on all platforms; 

203 we no longer use XDG_RUNTIME_DIR after various problems. 

204 """ 

205 env = os.environ 

206 

207 if env.get("JUPYTER_RUNTIME_DIR"): 

208 return env["JUPYTER_RUNTIME_DIR"] 

209 

210 return pjoin(jupyter_data_dir(), "runtime") 

211 

212 

213# %PROGRAMDATA% is not safe by default, require opt-in to trust it 

214_use_programdata: bool = envset("JUPYTER_USE_PROGRAMDATA") 

215# _win_programdata is a path str if we're using it, None otherwise 

216_win_programdata: str | None = None 

217if os.name == "nt" and _use_programdata: 

218 _win_programdata = os.environ.get("PROGRAMDATA", None) 

219 

220 

221if use_platform_dirs(): 

222 if os.name == "nt" and not _use_programdata: 

223 # default PROGRAMDATA used by site_* is not safe by default on Windows 

224 SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))] 

225 else: 

226 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir( 

227 APPNAME, appauthor=False, multipath=True 

228 ).split(os.pathsep) 

229else: # noqa: PLR5501 

230 # default dirs 

231 if os.name == "nt": 

232 # PROGRAMDATA is not defined by default on XP, and not safe by default 

233 if _win_programdata: 

234 SYSTEM_JUPYTER_PATH = [pjoin(_win_programdata, "jupyter")] 

235 else: 

236 SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))] 

237 else: 

238 SYSTEM_JUPYTER_PATH = [ 

239 "/usr/local/share/jupyter", 

240 "/usr/share/jupyter", 

241 ] 

242 

243ENV_JUPYTER_PATH: list[str] = [str(Path(sys.prefix, "share", "jupyter"))] 

244 

245 

246def jupyter_path(*subdirs: str) -> list[str]: 

247 """Return a list of directories to search for data files. 

248 

249 There are four sources of paths to search: 

250 

251 - $JUPYTER_PATH environment variable (always highest priority) 

252 - user directories (e.g. ~/.local/share/jupyter) 

253 - environment directories (e.g. {sys.prefix}/share/jupyter) 

254 - system-wide paths (e.g. /usr/local/share/jupyter) 

255 

256 JUPYTER_PATH environment variable has highest priority, if defined, 

257 and is purely additive. 

258 

259 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level 

260 directories will have priority over user-level directories. 

261 You can also set JUPYTER_PREFER_ENV_PATH=0 to explicitly prefer user directories. 

262 If Jupyter detects that you are in a virtualenv or conda environment, 

263 environment paths are also preferred to user paths, 

264 otherwise user paths are preferred to environment paths. 

265 

266 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

267 appropriate Python user site subdirectory to the user-level directories. 

268 

269 Finally, system-wide directories, such as `/usr/local/share/jupyter` are searched. 

270 

271 If ``*subdirs`` are given, that subdirectory will be added to each element. 

272 

273 

274 .. versionchanged:: 5.8 

275 

276 On Windows, %PROGRAMDATA% will be used as a system-wide path only if 

277 the JUPYTER_USE_PROGRAMDATA env is set. 

278 By default, there is no default system-wide path on Windows and the env path 

279 is used instead. 

280 

281 Examples: 

282 

283 >>> jupyter_path() 

284 ['~/.local/jupyter', '/usr/local/share/jupyter'] 

285 >>> jupyter_path('kernels') 

286 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels'] 

287 """ 

288 

289 paths: list[str] = [] 

290 

291 # highest priority is explicit environment variable 

292 if os.environ.get("JUPYTER_PATH"): 

293 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep)) 

294 

295 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

296 user = [jupyter_data_dir()] 

297 if site.ENABLE_USER_SITE: 

298 # Check if site.getuserbase() exists to be compatible with virtualenv, 

299 # which often does not have this method. 

300 userbase: str | None 

301 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

302 

303 if userbase: 

304 userdir = str(Path(userbase, "share", "jupyter")) 

305 if userdir not in user: 

306 user.append(userdir) 

307 

308 # Windows usually doesn't have a 'system' prefix, 

309 # so 'system' and 'env' are the same 

310 # make sure that env can still be preferred in this case 

311 if ENV_JUPYTER_PATH == SYSTEM_JUPYTER_PATH: 

312 env = ENV_JUPYTER_PATH 

313 else: 

314 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH] 

315 

316 if prefer_environment_over_user(): 

317 paths.extend(env) 

318 paths.extend(user) 

319 else: 

320 paths.extend(user) 

321 paths.extend(env) 

322 

323 # finally, add system paths (can overlap with env, so avoid duplicates) 

324 for _path in SYSTEM_JUPYTER_PATH: 

325 if _path not in paths: 

326 paths.append(_path) 

327 

328 # add subdir, if requested 

329 if subdirs: 

330 paths = [pjoin(p, *subdirs) for p in paths] 

331 return paths 

332 

333 

334ENV_CONFIG_PATH: list[str] = [str(Path(sys.prefix, "etc", "jupyter"))] 

335 

336if use_platform_dirs(): 

337 if os.name == "nt" and not _use_programdata: 

338 # default PROGRAMDATA is not safe by default on Windows 

339 # use ENV to avoid an empty list, since some may assume this is non-empty 

340 SYSTEM_CONFIG_PATH = ENV_CONFIG_PATH[:] 

341 else: 

342 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir( 

343 APPNAME, appauthor=False, multipath=True 

344 ).split(os.pathsep) 

345elif os.name == "nt": 

346 # PROGRAMDATA is not defined by default on XP, and not safe by default 

347 # but make sure it's not empty 

348 if _win_programdata: 

349 SYSTEM_CONFIG_PATH = [str(Path(_win_programdata, "jupyter"))] 

350 else: 

351 SYSTEM_CONFIG_PATH = ENV_CONFIG_PATH[:] 

352else: 

353 SYSTEM_CONFIG_PATH = [ 

354 "/usr/local/etc/jupyter", 

355 "/etc/jupyter", 

356 ] 

357 

358 

359def jupyter_config_path() -> list[str]: 

360 """Return the search path for Jupyter config files as a list. 

361 

362 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the 

363 environment-level directories will have priority over user-level 

364 directories. 

365 

366 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

367 appropriate Python user site subdirectory to the user-level directories. 

368 

369 Finally, system-wide directories such as `/usr/local/etc/jupyter` are searched. 

370 

371 

372 .. versionchanged:: 5.8 

373 

374 On Windows, %PROGRAMDATA% will be used as a system-wide path only if 

375 the JUPYTER_USE_PROGRAMDATA env is set. 

376 By default, there is no system-wide config path on Windows. 

377 

378 Examples: 

379 

380 >>> jupyter_config_path() 

381 ['~/.jupyter', '~/.local/etc/jupyter', '/usr/local/etc/jupyter', '/etc/jupyter'] 

382 

383 """ 

384 if os.environ.get("JUPYTER_NO_CONFIG"): 

385 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set. 

386 return [jupyter_config_dir()] 

387 

388 paths: list[str] = [] 

389 

390 # highest priority is explicit environment variable 

391 if os.environ.get("JUPYTER_CONFIG_PATH"): 

392 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep)) 

393 

394 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

395 user = [jupyter_config_dir()] 

396 if site.ENABLE_USER_SITE: 

397 userbase: str | None 

398 # Check if site.getuserbase() exists to be compatible with virtualenv, 

399 # which often does not have this method. 

400 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

401 

402 if userbase: 

403 userdir = str(Path(userbase, "etc", "jupyter")) 

404 if userdir not in user: 

405 user.append(userdir) 

406 

407 # Windows usually doesn't have a 'system' prefix, 

408 # so 'system' and 'env' are the same 

409 # make sure that env can still be preferred in this case 

410 if ENV_CONFIG_PATH == SYSTEM_CONFIG_PATH: 

411 env = ENV_CONFIG_PATH 

412 else: 

413 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH] 

414 

415 if prefer_environment_over_user(): 

416 paths.extend(env) 

417 paths.extend(user) 

418 else: 

419 paths.extend(user) 

420 paths.extend(env) 

421 

422 # Finally, system path 

423 if ENV_CONFIG_PATH != SYSTEM_CONFIG_PATH: 

424 paths.extend(SYSTEM_CONFIG_PATH) 

425 return paths 

426 

427 

428def exists(path: str) -> bool: 

429 """Replacement for `os.path.exists` which works for host mapped volumes 

430 on Windows containers 

431 """ 

432 try: 

433 os.lstat(path) 

434 except OSError: 

435 return False 

436 return True 

437 

438 

439def is_file_hidden_win(abs_path: str | Path, stat_res: Any | None = None) -> bool: 

440 """Is a file hidden? 

441 

442 This only checks the file itself; it should be called in combination with 

443 checking the directory containing the file. 

444 

445 Use is_hidden() instead to check the file and its parent directories. 

446 

447 Parameters 

448 ---------- 

449 abs_path : unicode 

450 The absolute path to check. 

451 stat_res : os.stat_result, optional 

452 The result of calling stat() on abs_path. If not passed, this function 

453 will call stat() internally. 

454 """ 

455 abs_path = Path(abs_path) 

456 if abs_path.name.startswith("."): 

457 return True 

458 

459 if stat_res is None: 

460 try: 

461 stat_res = Path(abs_path).stat() 

462 except OSError as e: 

463 if e.errno == errno.ENOENT: 

464 return False 

465 raise 

466 

467 try: 

468 if ( 

469 stat_res.st_file_attributes # type:ignore[union-attr] 

470 & stat.FILE_ATTRIBUTE_HIDDEN # type:ignore[attr-defined] 

471 ): 

472 return True 

473 except AttributeError: 

474 # allow AttributeError on PyPy for Windows 

475 # 'stat_result' object has no attribute 'st_file_attributes' 

476 # https://foss.heptapod.net/pypy/pypy/-/issues/3469 

477 warnings.warn( 

478 "hidden files are not detectable on this system, so no file will be marked as hidden.", 

479 stacklevel=2, 

480 ) 

481 

482 return False 

483 

484 

485def is_file_hidden_posix(abs_path: str | Path, stat_res: Any | None = None) -> bool: 

486 """Is a file hidden? 

487 

488 This only checks the file itself; it should be called in combination with 

489 checking the directory containing the file. 

490 

491 Use is_hidden() instead to check the file and its parent directories. 

492 

493 Parameters 

494 ---------- 

495 abs_path : unicode 

496 The absolute path to check. 

497 stat_res : os.stat_result, optional 

498 The result of calling stat() on abs_path. If not passed, this function 

499 will call stat() internally. 

500 """ 

501 abs_path = Path(abs_path) 

502 if abs_path.name.startswith("."): 

503 return True 

504 

505 if stat_res is None or stat.S_ISLNK(stat_res.st_mode): 

506 try: 

507 stat_res = abs_path.stat() 

508 except OSError as e: 

509 if e.errno == errno.ENOENT: 

510 return False 

511 raise 

512 

513 # check that dirs can be listed 

514 if stat.S_ISDIR(stat_res.st_mode): # noqa: SIM102 

515 # use x-access, not actual listing, in case of slow/large listings 

516 if not os.access(abs_path, os.X_OK | os.R_OK): 

517 return True 

518 

519 # check UF_HIDDEN 

520 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN: 

521 return True 

522 

523 return False 

524 

525 

526if sys.platform == "win32": 

527 is_file_hidden = is_file_hidden_win 

528else: 

529 is_file_hidden = is_file_hidden_posix 

530 

531 

532def is_hidden(abs_path: str | Path, abs_root: str | Path = "") -> bool: 

533 """Is a file hidden or contained in a hidden directory? 

534 

535 This will start with the rightmost path element and work backwards to the 

536 given root to see if a path is hidden or in a hidden directory. Hidden is 

537 determined by either name starting with '.' or the UF_HIDDEN flag as 

538 reported by stat. 

539 

540 If abs_path is the same directory as abs_root, it will be visible even if 

541 that is a hidden folder. This only checks the visibility of files 

542 and directories *within* abs_root. 

543 

544 Parameters 

545 ---------- 

546 abs_path : str or Path 

547 The absolute path to check for hidden directories. 

548 abs_root : str or Path 

549 The absolute path of the root directory in which hidden directories 

550 should be checked for. 

551 """ 

552 abs_path = Path(os.path.normpath(abs_path)) 

553 if abs_root: 

554 abs_root = Path(os.path.normpath(abs_root)) 

555 else: 

556 abs_root = list(abs_path.parents)[-1] 

557 

558 if abs_path == abs_root: 

559 # root itself is never hidden 

560 return False 

561 

562 # check that arguments are valid 

563 if not abs_path.is_absolute(): 

564 _msg = f"{abs_path=} is not absolute. abs_path must be absolute." 

565 raise ValueError(_msg) 

566 if not abs_root.is_absolute(): 

567 _msg = f"{abs_root=} is not absolute. abs_root must be absolute." 

568 raise ValueError(_msg) 

569 if not abs_path.is_relative_to(abs_root): 

570 _msg = ( 

571 f"{abs_path=} is not a subdirectory of {abs_root=}. abs_path must be within abs_root." 

572 ) 

573 raise ValueError(_msg) 

574 

575 if is_file_hidden(abs_path): 

576 return True 

577 

578 relative_path = abs_path.relative_to(abs_root) 

579 if any(part.startswith(".") for part in relative_path.parts): 

580 return True 

581 

582 # check UF_HIDDEN on any location up to root. 

583 # is_file_hidden() already checked the file, so start from its parent dir 

584 for parent in abs_path.parents: 

585 if not parent.exists(): 

586 continue 

587 if parent == abs_root: 

588 break 

589 try: 

590 # may fail on Windows junctions 

591 st = parent.lstat() 

592 except OSError: 

593 return True 

594 if getattr(st, "st_flags", 0) & UF_HIDDEN: 

595 return True 

596 

597 return False 

598 

599 

600def win32_restrict_file_to_user(fname: str) -> None: 

601 """Secure a windows file to read-only access for the user. 

602 Follows guidance from win32 library creator: 

603 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

604 

605 This method should be executed against an already generated file which 

606 has no secrets written to it yet. 

607 

608 Parameters 

609 ---------- 

610 

611 fname : unicode 

612 The path to the file to secure 

613 """ 

614 try: 

615 import win32api # noqa: PLC0415 

616 except ImportError: 

617 return _win32_restrict_file_to_user_ctypes(fname) 

618 

619 import ntsecuritycon as con # noqa: PLC0415 

620 import win32security # noqa: PLC0415 

621 

622 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone") 

623 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid) 

624 user, _domain, _type = win32security.LookupAccountName( 

625 "", win32api.GetUserNameEx(win32api.NameSamCompatible) 

626 ) 

627 

628 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION) 

629 

630 dacl = win32security.ACL() 

631 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone) 

632 dacl.AddAccessAllowedAce( 

633 win32security.ACL_REVISION, 

634 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE, 

635 user, 

636 ) 

637 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins) 

638 

639 sd.SetSecurityDescriptorDacl(1, dacl, 0) 

640 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd) 

641 return None 

642 

643 

644def _win32_restrict_file_to_user_ctypes(fname: str) -> None: 

645 """Secure a windows file to read-only access for the user. 

646 

647 Follows guidance from win32 library creator: 

648 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

649 

650 This method should be executed against an already generated file which 

651 has no secrets written to it yet. 

652 

653 Parameters 

654 ---------- 

655 

656 fname : unicode 

657 The path to the file to secure 

658 """ 

659 import ctypes # noqa: PLC0415 

660 from ctypes import wintypes # noqa: PLC0415 

661 

662 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined] 

663 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined] 

664 

665 NameSamCompatible = 2 

666 WinBuiltinAdministratorsSid = 26 

667 DACL_SECURITY_INFORMATION = 4 

668 ACL_REVISION = 2 

669 ERROR_INSUFFICIENT_BUFFER = 122 

670 ERROR_MORE_DATA = 234 

671 

672 SYNCHRONIZE = 0x100000 

673 DELETE = 0x00010000 

674 STANDARD_RIGHTS_REQUIRED = 0xF0000 

675 STANDARD_RIGHTS_READ = 0x20000 

676 STANDARD_RIGHTS_WRITE = 0x20000 

677 FILE_READ_DATA = 1 

678 FILE_READ_EA = 8 

679 FILE_READ_ATTRIBUTES = 128 

680 FILE_WRITE_DATA = 2 

681 FILE_APPEND_DATA = 4 

682 FILE_WRITE_EA = 16 

683 FILE_WRITE_ATTRIBUTES = 256 

684 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF 

685 FILE_GENERIC_READ = ( 

686 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE 

687 ) 

688 FILE_GENERIC_WRITE = ( 

689 STANDARD_RIGHTS_WRITE 

690 | FILE_WRITE_DATA 

691 | FILE_WRITE_ATTRIBUTES 

692 | FILE_WRITE_EA 

693 | FILE_APPEND_DATA 

694 | SYNCHRONIZE 

695 ) 

696 

697 class ACL(ctypes.Structure): 

698 _fields_ = [ 

699 ("AclRevision", wintypes.BYTE), 

700 ("Sbz1", wintypes.BYTE), 

701 ("AclSize", wintypes.WORD), 

702 ("AceCount", wintypes.WORD), 

703 ("Sbz2", wintypes.WORD), 

704 ] 

705 

706 PSID = ctypes.c_void_p 

707 PACL = ctypes.POINTER(ACL) 

708 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE) 

709 

710 def _nonzero_success(result: int, func: Any, args: Any) -> Any: # noqa: ARG001 

711 if not result: 

712 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined] 

713 return args 

714 

715 secur32.GetUserNameExW.errcheck = _nonzero_success 

716 secur32.GetUserNameExW.restype = wintypes.BOOL 

717 secur32.GetUserNameExW.argtypes = ( 

718 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat 

719 wintypes.LPWSTR, # LPWSTR lpNameBuffer, 

720 wintypes.PULONG, # PULONG nSize 

721 ) 

722 

723 advapi32.CreateWellKnownSid.errcheck = _nonzero_success 

724 advapi32.CreateWellKnownSid.restype = wintypes.BOOL 

725 advapi32.CreateWellKnownSid.argtypes = ( 

726 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType 

727 PSID, # PSID DomainSid 

728 PSID, # PSID pSid 

729 wintypes.PDWORD, # DWORD *cbSid 

730 ) 

731 

732 advapi32.LookupAccountNameW.errcheck = _nonzero_success 

733 advapi32.LookupAccountNameW.restype = wintypes.BOOL 

734 advapi32.LookupAccountNameW.argtypes = ( 

735 wintypes.LPWSTR, # LPCWSTR lpSystemName 

736 wintypes.LPWSTR, # LPCWSTR lpAccountName 

737 PSID, # PSID Sid 

738 wintypes.LPDWORD, # LPDWORD cbSid 

739 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName 

740 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName 

741 wintypes.LPDWORD, # PSID_NAME_USE peUse 

742 ) 

743 

744 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success 

745 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL 

746 advapi32.AddAccessAllowedAce.argtypes = ( 

747 PACL, # PACL pAcl 

748 wintypes.DWORD, # DWORD dwAceRevision 

749 wintypes.DWORD, # DWORD AccessMask 

750 PSID, # PSID pSid 

751 ) 

752 

753 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success 

754 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL 

755 advapi32.SetSecurityDescriptorDacl.argtypes = ( 

756 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

757 wintypes.BOOL, # BOOL bDaclPresent 

758 PACL, # PACL pDacl 

759 wintypes.BOOL, # BOOL bDaclDefaulted 

760 ) 

761 

762 advapi32.GetFileSecurityW.errcheck = _nonzero_success 

763 advapi32.GetFileSecurityW.restype = wintypes.BOOL 

764 advapi32.GetFileSecurityW.argtypes = ( 

765 wintypes.LPCWSTR, # LPCWSTR lpFileName 

766 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation 

767 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

768 wintypes.DWORD, # DWORD nLength 

769 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded 

770 ) 

771 

772 advapi32.SetFileSecurityW.errcheck = _nonzero_success 

773 advapi32.SetFileSecurityW.restype = wintypes.BOOL 

774 advapi32.SetFileSecurityW.argtypes = ( 

775 wintypes.LPCWSTR, # LPCWSTR lpFileName 

776 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation 

777 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

778 ) 

779 

780 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success 

781 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL 

782 advapi32.MakeAbsoluteSD.argtypes = ( 

783 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

784 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

785 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize 

786 PACL, # PACL pDacl 

787 wintypes.LPDWORD, # LPDWORD lpdwDaclSize 

788 PACL, # PACL pSacl 

789 wintypes.LPDWORD, # LPDWORD lpdwSaclSize 

790 PSID, # PSID pOwner 

791 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize 

792 PSID, # PSID pPrimaryGroup 

793 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize 

794 ) 

795 

796 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success 

797 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL 

798 advapi32.MakeSelfRelativeSD.argtypes = ( 

799 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

800 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

801 wintypes.LPDWORD, # LPDWORD lpdwBufferLength 

802 ) 

803 

804 advapi32.InitializeAcl.errcheck = _nonzero_success 

805 advapi32.InitializeAcl.restype = wintypes.BOOL 

806 advapi32.InitializeAcl.argtypes = ( 

807 PACL, # PACL pAcl, 

808 wintypes.DWORD, # DWORD nAclLength, 

809 wintypes.DWORD, # DWORD dwAclRevision 

810 ) 

811 

812 def CreateWellKnownSid(WellKnownSidType: Any) -> Any: 

813 # return a SID for predefined aliases 

814 pSid = (ctypes.c_char * 1)() 

815 cbSid = wintypes.DWORD() 

816 try: 

817 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

818 except OSError as e: 

819 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

820 raise 

821 pSid = (ctypes.c_char * cbSid.value)() 

822 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

823 return pSid[:] 

824 

825 def GetUserNameEx(NameFormat: Any) -> Any: 

826 # return the user or other security principal associated with 

827 # the calling thread 

828 nSize = ctypes.pointer(ctypes.c_ulong(0)) 

829 try: 

830 secur32.GetUserNameExW(NameFormat, None, nSize) 

831 except OSError as e: 

832 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined] 

833 raise 

834 if not nSize.contents.value: 

835 return None 

836 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value) 

837 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize) 

838 return lpNameBuffer.value 

839 

840 def LookupAccountName(lpSystemName: Any, lpAccountName: Any) -> Any: 

841 # return a security identifier (SID) for an account on a system 

842 # and the name of the domain on which the account was found 

843 cbSid = wintypes.DWORD(0) 

844 cchReferencedDomainName = wintypes.DWORD(0) 

845 peUse = wintypes.DWORD(0) 

846 try: 

847 advapi32.LookupAccountNameW( 

848 lpSystemName, 

849 lpAccountName, 

850 None, 

851 ctypes.byref(cbSid), 

852 None, 

853 ctypes.byref(cchReferencedDomainName), 

854 ctypes.byref(peUse), 

855 ) 

856 except OSError as e: 

857 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

858 raise 

859 Sid = ctypes.create_unicode_buffer("", cbSid.value) 

860 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID) 

861 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1) 

862 success = advapi32.LookupAccountNameW( 

863 lpSystemName, 

864 lpAccountName, 

865 pSid, 

866 ctypes.byref(cbSid), 

867 lpReferencedDomainName, 

868 ctypes.byref(cchReferencedDomainName), 

869 ctypes.byref(peUse), 

870 ) 

871 if not success: 

872 raise ctypes.WinError() # type:ignore[attr-defined] 

873 return pSid, lpReferencedDomainName.value, peUse.value 

874 

875 def AddAccessAllowedAce(pAcl: Any, dwAceRevision: Any, AccessMask: Any, pSid: Any) -> Any: 

876 # add an access-allowed access control entry (ACE) 

877 # to an access control list (ACL) 

878 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid) 

879 

880 def GetFileSecurity(lpFileName: Any, RequestedInformation: Any) -> Any: 

881 # return information about the security of a file or directory 

882 nLength = wintypes.DWORD(0) 

883 try: 

884 advapi32.GetFileSecurityW( 

885 lpFileName, 

886 RequestedInformation, 

887 None, 

888 0, 

889 ctypes.byref(nLength), 

890 ) 

891 except OSError as e: 

892 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

893 raise 

894 if not nLength.value: 

895 return None 

896 pSecurityDescriptor = (wintypes.BYTE * nLength.value)() 

897 advapi32.GetFileSecurityW( 

898 lpFileName, 

899 RequestedInformation, 

900 pSecurityDescriptor, 

901 nLength, 

902 ctypes.byref(nLength), 

903 ) 

904 return pSecurityDescriptor 

905 

906 def SetFileSecurity( 

907 lpFileName: Any, RequestedInformation: Any, pSecurityDescriptor: Any 

908 ) -> Any: 

909 # set the security of a file or directory object 

910 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor) 

911 

912 def SetSecurityDescriptorDacl( 

913 pSecurityDescriptor: Any, bDaclPresent: Any, pDacl: Any, bDaclDefaulted: Any 

914 ) -> Any: 

915 # set information in a discretionary access control list (DACL) 

916 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted) 

917 

918 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor: Any) -> Any: 

919 # return a security descriptor in absolute format 

920 # by using a security descriptor in self-relative format as a template 

921 pAbsoluteSecurityDescriptor = None 

922 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0) 

923 pDacl = None 

924 lpdwDaclSize = wintypes.DWORD(0) 

925 pSacl = None 

926 lpdwSaclSize = wintypes.DWORD(0) 

927 pOwner = None 

928 lpdwOwnerSize = wintypes.DWORD(0) 

929 pPrimaryGroup = None 

930 lpdwPrimaryGroupSize = wintypes.DWORD(0) 

931 try: 

932 advapi32.MakeAbsoluteSD( 

933 pSelfRelativeSecurityDescriptor, 

934 pAbsoluteSecurityDescriptor, 

935 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

936 pDacl, 

937 ctypes.byref(lpdwDaclSize), 

938 pSacl, 

939 ctypes.byref(lpdwSaclSize), 

940 pOwner, 

941 ctypes.byref(lpdwOwnerSize), 

942 pPrimaryGroup, 

943 ctypes.byref(lpdwPrimaryGroupSize), 

944 ) 

945 except OSError as e: 

946 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

947 raise 

948 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)() 

949 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)() 

950 pDacl = ctypes.cast(pDaclData, PACL).contents 

951 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)() 

952 pSacl = ctypes.cast(pSaclData, PACL).contents 

953 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)() 

954 pOwner = ctypes.cast(pOwnerData, PSID) 

955 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)() 

956 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID) 

957 advapi32.MakeAbsoluteSD( 

958 pSelfRelativeSecurityDescriptor, 

959 pAbsoluteSecurityDescriptor, 

960 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

961 pDacl, 

962 ctypes.byref(lpdwDaclSize), 

963 pSacl, 

964 ctypes.byref(lpdwSaclSize), 

965 pOwner, 

966 lpdwOwnerSize, 

967 pPrimaryGroup, 

968 ctypes.byref(lpdwPrimaryGroupSize), 

969 ) 

970 return pAbsoluteSecurityDescriptor 

971 

972 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor: Any) -> Any: 

973 # return a security descriptor in self-relative format 

974 # by using a security descriptor in absolute format as a template 

975 pSelfRelativeSecurityDescriptor = None 

976 lpdwBufferLength = wintypes.DWORD(0) 

977 try: 

978 advapi32.MakeSelfRelativeSD( 

979 pAbsoluteSecurityDescriptor, 

980 pSelfRelativeSecurityDescriptor, 

981 ctypes.byref(lpdwBufferLength), 

982 ) 

983 except OSError as e: 

984 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

985 raise 

986 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)() 

987 advapi32.MakeSelfRelativeSD( 

988 pAbsoluteSecurityDescriptor, 

989 pSelfRelativeSecurityDescriptor, 

990 ctypes.byref(lpdwBufferLength), 

991 ) 

992 return pSelfRelativeSecurityDescriptor 

993 

994 def NewAcl() -> Any: 

995 # return a new, initialized ACL (access control list) structure 

996 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ? 

997 acl_data = ctypes.create_string_buffer(nAclLength) 

998 pAcl = ctypes.cast(acl_data, PACL).contents 

999 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION) 

1000 return pAcl 

1001 

1002 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid) 

1003 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0] 

1004 

1005 Acl = NewAcl() 

1006 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins) 

1007 AddAccessAllowedAce( 

1008 Acl, 

1009 ACL_REVISION, 

1010 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE, 

1011 SidUser, 

1012 ) 

1013 

1014 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION) 

1015 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD) 

1016 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0) 

1017 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD) 

1018 

1019 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD) 

1020 

1021 

1022def get_file_mode(fname: str) -> int: 

1023 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner. 

1024 

1025 Parameters 

1026 ---------- 

1027 

1028 fname : unicode 

1029 The path to the file to get mode from 

1030 

1031 """ 

1032 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we 

1033 # should tolerate the execute bit on the file's owner when validating permissions - thus 

1034 # the missing least significant bit on the third octal digit. In addition, we also tolerate 

1035 # the sticky bit being set, so the lsb from the fourth octal digit is also removed. 

1036 return ( 

1037 stat.S_IMODE(Path(fname).stat().st_mode) & 0o6677 

1038 ) # Use 4 octal digits since S_IMODE does the same 

1039 

1040 

1041allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1") 

1042 

1043 

1044@contextmanager 

1045def secure_write(fname: str, binary: bool = False) -> Iterator[Any]: 

1046 """Opens a file in the most restricted pattern available for 

1047 writing content. This limits the file mode to `0o0600` and yields 

1048 the resulting opened filed handle. 

1049 

1050 Parameters 

1051 ---------- 

1052 

1053 fname : unicode 

1054 The path to the file to write 

1055 

1056 binary: boolean 

1057 Indicates that the file is binary 

1058 """ 

1059 mode = "wb" if binary else "w" 

1060 encoding = None if binary else "utf-8" 

1061 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC 

1062 try: 

1063 Path(fname).unlink() 

1064 except OSError: 

1065 # Skip any issues with the file not existing 

1066 pass 

1067 

1068 if os.name == "nt": 

1069 if allow_insecure_writes: 

1070 # Mounted file systems can have a number of failure modes inside this block. 

1071 # For windows machines in insecure mode we simply skip this to avoid failures :/ 

1072 issue_insecure_write_warning() 

1073 else: 

1074 # Python on windows does not respect the group and public bits for chmod, so we need 

1075 # to take additional steps to secure the contents. 

1076 # Touch file preemptively to avoid editing permissions in open files in Windows 

1077 fd = os.open(fname, open_flag, 0o0600) 

1078 os.close(fd) 

1079 open_flag = os.O_WRONLY | os.O_TRUNC 

1080 win32_restrict_file_to_user(fname) 

1081 

1082 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f: 

1083 if os.name != "nt": 

1084 # Enforce that the file got the requested permissions before writing 

1085 file_mode = get_file_mode(fname) 

1086 if file_mode != 0o0600: 

1087 if allow_insecure_writes: 

1088 issue_insecure_write_warning() 

1089 else: 

1090 msg = ( 

1091 f"Permissions assignment failed for secure file: '{fname}'." 

1092 f" Got '{oct(file_mode)}' instead of '0o0600'." 

1093 ) 

1094 raise RuntimeError(msg) 

1095 yield f 

1096 

1097 

1098def issue_insecure_write_warning() -> None: 

1099 """Issue an insecure write warning.""" 

1100 

1101 def format_warning(msg: str, *args: Any, **kwargs: Any) -> str: # noqa: ARG001 

1102 return str(msg) + "\n" 

1103 

1104 warnings.formatwarning = format_warning # type:ignore[assignment] 

1105 warnings.warn( 

1106 "WARNING: Insecure writes have been enabled via environment variable " 

1107 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the " 

1108 "variable or set its value to 'False'.", 

1109 stacklevel=2, 

1110 )