Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_core/paths.py: 13%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

460 statements  

1"""Path utility functions.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6# Derived from IPython.utils.path, which is 

7# Copyright (c) IPython Development Team. 

8# Distributed under the terms of the Modified BSD License. 

9from __future__ import annotations 

10 

11import errno 

12import os 

13import site 

14import stat 

15import sys 

16import tempfile 

17import warnings 

18from contextlib import contextmanager 

19from pathlib import Path 

20from typing import Any, Iterator, Literal, Optional, overload 

21 

22import platformdirs 

23 

24from .utils import deprecation 

25 

26pjoin = os.path.join 

27 

28# Capitalize Jupyter in paths only on Windows and MacOS (when not in Homebrew) 

29if sys.platform == "win32" or ( 

30 sys.platform == "darwin" and not sys.prefix.startswith("/opt/homebrew") 

31): 

32 APPNAME = "Jupyter" 

33else: 

34 APPNAME = "jupyter" 

35 

36# UF_HIDDEN is a stat flag not defined in the stat module. 

37# It is used by BSD to indicate hidden files. 

38UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768) 

39 

40 

41@overload 

42def envset(name: str, default: bool = False) -> bool: ... 

43 

44 

45@overload 

46def envset(name: str, default: Literal[None]) -> Optional[bool]: ... 

47 

48 

49def envset(name: str, default: Optional[bool] = False) -> Optional[bool]: 

50 """Return the boolean value of a given environment variable. 

51 

52 An environment variable is considered set if it is assigned to a value 

53 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive) 

54 

55 If the environment variable is not defined, the default value is returned. 

56 """ 

57 if name not in os.environ: 

58 return default 

59 

60 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"] 

61 

62 

63def use_platform_dirs() -> bool: 

64 """Determine if platformdirs should be used for system-specific paths. 

65 

66 We plan for this to default to False in jupyter_core version 5 and to True 

67 in jupyter_core version 6. 

68 """ 

69 return envset("JUPYTER_PLATFORM_DIRS", False) 

70 

71 

72def get_home_dir() -> str: 

73 """Get the real path of the home directory""" 

74 homedir = Path("~").expanduser() 

75 # Next line will make things work even when /home/ is a symlink to 

76 # /usr/home as it is on FreeBSD, for example 

77 return str(Path(homedir).resolve()) 

78 

79 

80_dtemps: dict[str, str] = {} 

81 

82 

83def _do_i_own(path: str) -> bool: 

84 """Return whether the current user owns the given path""" 

85 p = Path(path).resolve() 

86 

87 # walk up to first existing parent 

88 while not p.exists() and p != p.parent: 

89 p = p.parent 

90 

91 # simplest check: owner by name 

92 # not always implemented or available 

93 try: 

94 return p.owner() == os.getlogin() 

95 except Exception: # noqa: S110 

96 pass 

97 

98 if hasattr(os, "geteuid"): 

99 try: 

100 st = p.stat() 

101 return st.st_uid == os.geteuid() 

102 except (NotImplementedError, OSError): 

103 # geteuid not always implemented 

104 pass 

105 

106 # no ownership checks worked, check write access 

107 return os.access(p, os.W_OK) 

108 

109 

110def prefer_environment_over_user() -> bool: 

111 """Determine if environment-level paths should take precedence over user-level paths.""" 

112 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value 

113 if "JUPYTER_PREFER_ENV_PATH" in os.environ: 

114 return envset("JUPYTER_PREFER_ENV_PATH") 

115 

116 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def) 

117 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix): 

118 return True 

119 

120 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True 

121 if ( 

122 "CONDA_PREFIX" in os.environ 

123 and sys.prefix.startswith(os.environ["CONDA_PREFIX"]) 

124 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base" 

125 and _do_i_own(sys.prefix) 

126 ): 

127 return True 

128 

129 return False 

130 

131 

132def _mkdtemp_once(name: str) -> str: 

133 """Make or reuse a temporary directory. 

134 

135 If this is called with the same name in the same process, it will return 

136 the same directory. 

137 """ 

138 try: 

139 return _dtemps[name] 

140 except KeyError: 

141 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-") 

142 return d 

143 

144 

145def jupyter_config_dir() -> str: 

146 """Get the Jupyter config directory for this platform and user. 

147 

148 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate 

149 directory for the platform. 

150 """ 

151 

152 env = os.environ 

153 if env.get("JUPYTER_NO_CONFIG"): 

154 return _mkdtemp_once("jupyter-clean-cfg") 

155 

156 if env.get("JUPYTER_CONFIG_DIR"): 

157 return env["JUPYTER_CONFIG_DIR"] 

158 

159 if use_platform_dirs(): 

160 return platformdirs.user_config_dir(APPNAME, appauthor=False) 

161 

162 home_dir = get_home_dir() 

163 return pjoin(home_dir, ".jupyter") 

164 

165 

166def jupyter_data_dir() -> str: 

167 """Get the config directory for Jupyter data files for this platform and user. 

168 

169 These are non-transient, non-configuration files. 

170 

171 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path. 

172 """ 

173 env = os.environ 

174 

175 if env.get("JUPYTER_DATA_DIR"): 

176 return env["JUPYTER_DATA_DIR"] 

177 

178 if use_platform_dirs(): 

179 return platformdirs.user_data_dir(APPNAME, appauthor=False) 

180 

181 home = get_home_dir() 

182 

183 if sys.platform == "darwin": 

184 return str(Path(home, "Library", "Jupyter")) 

185 if sys.platform == "win32": 

186 appdata = os.environ.get("APPDATA", None) 

187 if appdata: 

188 return str(Path(appdata, "jupyter").resolve()) 

189 return pjoin(jupyter_config_dir(), "data") 

190 # Linux, non-OS X Unix, AIX, etc. 

191 xdg = env.get("XDG_DATA_HOME", None) 

192 if not xdg: 

193 xdg = pjoin(home, ".local", "share") 

194 return pjoin(xdg, "jupyter") 

195 

196 

197def jupyter_runtime_dir() -> str: 

198 """Return the runtime dir for transient jupyter files. 

199 

200 Returns JUPYTER_RUNTIME_DIR if defined. 

201 

202 The default is now (data_dir)/runtime on all platforms; 

203 we no longer use XDG_RUNTIME_DIR after various problems. 

204 """ 

205 env = os.environ 

206 

207 if env.get("JUPYTER_RUNTIME_DIR"): 

208 return env["JUPYTER_RUNTIME_DIR"] 

209 

210 return pjoin(jupyter_data_dir(), "runtime") 

211 

212 

213# %PROGRAMDATA% is not safe by default, require opt-in to trust it 

214_use_programdata: bool = envset("JUPYTER_USE_PROGRAMDATA") 

215# _win_programdata is a path str if we're using it, None otherwise 

216_win_programdata: str | None = None 

217if os.name == "nt" and _use_programdata: 

218 _win_programdata = os.environ.get("PROGRAMDATA", None) 

219 

220 

221if use_platform_dirs(): 

222 if os.name == "nt" and not _use_programdata: 

223 # default PROGRAMDATA used by site_* is not safe by default on Windows 

224 SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))] 

225 else: 

226 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir( 

227 APPNAME, appauthor=False, multipath=True 

228 ).split(os.pathsep) 

229else: 

230 deprecation( 

231 "Jupyter is migrating its paths to use standard platformdirs\n" 

232 "given by the platformdirs library. To remove this warning and\n" 

233 "see the appropriate new directories, set the environment variable\n" 

234 "`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.\n" 

235 "The use of platformdirs will be the default in `jupyter_core` v6" 

236 ) 

237 if os.name == "nt": 

238 # PROGRAMDATA is not defined by default on XP, and not safe by default 

239 if _win_programdata: 

240 SYSTEM_JUPYTER_PATH = [pjoin(_win_programdata, "jupyter")] 

241 else: 

242 SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))] 

243 else: 

244 SYSTEM_JUPYTER_PATH = [ 

245 "/usr/local/share/jupyter", 

246 "/usr/share/jupyter", 

247 ] 

248 

249ENV_JUPYTER_PATH: list[str] = [str(Path(sys.prefix, "share", "jupyter"))] 

250 

251 

252def jupyter_path(*subdirs: str) -> list[str]: 

253 """Return a list of directories to search for data files. 

254 

255 There are four sources of paths to search: 

256 

257 - $JUPYTER_PATH environment variable (always highest priority) 

258 - user directories (e.g. ~/.local/share/jupyter) 

259 - environment directories (e.g. {sys.prefix}/share/jupyter) 

260 - system-wide paths (e.g. /usr/local/share/jupyter) 

261 

262 JUPYTER_PATH environment variable has highest priority, if defined, 

263 and is purely additive. 

264 

265 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level 

266 directories will have priority over user-level directories. 

267 You can also set JUPYTER_PREFER_ENV_PATH=0 to explicitly prefer user directories. 

268 If Jupyter detects that you are in a virtualenv or conda environment, 

269 environment paths are also preferred to user paths, 

270 otherwise user paths are preferred to environment paths. 

271 

272 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

273 appropriate Python user site subdirectory to the user-level directories. 

274 

275 Finally, system-wide directories, such as `/usr/local/share/jupyter` are searched. 

276 

277 If ``*subdirs`` are given, that subdirectory will be added to each element. 

278 

279 

280 .. versionchanged:: 5.8 

281 

282 On Windows, %PROGRAMDATA% will be used as a system-wide path only if 

283 the JUPYTER_USE_PROGRAMDATA env is set. 

284 By default, there is no default system-wide path on Windows and the env path 

285 is used instead. 

286 

287 Examples: 

288 

289 >>> jupyter_path() 

290 ['~/.local/jupyter', '/usr/local/share/jupyter'] 

291 >>> jupyter_path('kernels') 

292 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels'] 

293 """ 

294 

295 paths: list[str] = [] 

296 

297 # highest priority is explicit environment variable 

298 if os.environ.get("JUPYTER_PATH"): 

299 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep)) 

300 

301 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

302 user = [jupyter_data_dir()] 

303 if site.ENABLE_USER_SITE: 

304 # Check if site.getuserbase() exists to be compatible with virtualenv, 

305 # which often does not have this method. 

306 userbase: Optional[str] 

307 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

308 

309 if userbase: 

310 userdir = str(Path(userbase, "share", "jupyter")) 

311 if userdir not in user: 

312 user.append(userdir) 

313 

314 # Windows usually doesn't have a 'system' prefix, 

315 # so 'system' and 'env' are the same 

316 # make sure that env can still be preferred in this case 

317 if ENV_JUPYTER_PATH == SYSTEM_JUPYTER_PATH: 

318 env = ENV_JUPYTER_PATH 

319 else: 

320 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH] 

321 

322 if prefer_environment_over_user(): 

323 paths.extend(env) 

324 paths.extend(user) 

325 else: 

326 paths.extend(user) 

327 paths.extend(env) 

328 

329 # finally, add system paths (can overlap with env, so avoid duplicates) 

330 for _path in SYSTEM_JUPYTER_PATH: 

331 if _path not in paths: 

332 paths.append(_path) 

333 

334 # add subdir, if requested 

335 if subdirs: 

336 paths = [pjoin(p, *subdirs) for p in paths] 

337 return paths 

338 

339 

340ENV_CONFIG_PATH: list[str] = [str(Path(sys.prefix, "etc", "jupyter"))] 

341 

342if use_platform_dirs(): 

343 if os.name == "nt" and not _use_programdata: 

344 # default PROGRAMDATA is not safe by default on Windows 

345 # use ENV to avoid an empty list, since some may assume this is non-empty 

346 SYSTEM_CONFIG_PATH = ENV_CONFIG_PATH[:] 

347 else: 

348 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir( 

349 APPNAME, appauthor=False, multipath=True 

350 ).split(os.pathsep) 

351elif os.name == "nt": 

352 # PROGRAMDATA is not defined by default on XP, and not safe by default 

353 # but make sure it's not empty 

354 if _win_programdata: 

355 SYSTEM_CONFIG_PATH = [str(Path(_win_programdata, "jupyter"))] 

356 else: 

357 SYSTEM_CONFIG_PATH = ENV_CONFIG_PATH[:] 

358else: 

359 SYSTEM_CONFIG_PATH = [ 

360 "/usr/local/etc/jupyter", 

361 "/etc/jupyter", 

362 ] 

363 

364 

365def jupyter_config_path() -> list[str]: 

366 """Return the search path for Jupyter config files as a list. 

367 

368 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the 

369 environment-level directories will have priority over user-level 

370 directories. 

371 

372 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

373 appropriate Python user site subdirectory to the user-level directories. 

374 

375 Finally, system-wide directories such as `/usr/local/etc/jupyter` are searched. 

376 

377 

378 .. versionchanged:: 5.8 

379 

380 On Windows, %PROGRAMDATA% will be used as a system-wide path only if 

381 the JUPYTER_USE_PROGRAMDATA env is set. 

382 By default, there is no system-wide config path on Windows. 

383 

384 Examples: 

385 

386 >>> jupyter_config_path() 

387 ['~/.jupyter', '~/.local/etc/jupyter', '/usr/local/etc/jupyter', '/etc/jupyter'] 

388 

389 """ 

390 if os.environ.get("JUPYTER_NO_CONFIG"): 

391 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set. 

392 return [jupyter_config_dir()] 

393 

394 paths: list[str] = [] 

395 

396 # highest priority is explicit environment variable 

397 if os.environ.get("JUPYTER_CONFIG_PATH"): 

398 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep)) 

399 

400 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

401 user = [jupyter_config_dir()] 

402 if site.ENABLE_USER_SITE: 

403 userbase: Optional[str] 

404 # Check if site.getuserbase() exists to be compatible with virtualenv, 

405 # which often does not have this method. 

406 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

407 

408 if userbase: 

409 userdir = str(Path(userbase, "etc", "jupyter")) 

410 if userdir not in user: 

411 user.append(userdir) 

412 

413 # Windows usually doesn't have a 'system' prefix, 

414 # so 'system' and 'env' are the same 

415 # make sure that env can still be preferred in this case 

416 if ENV_CONFIG_PATH == SYSTEM_CONFIG_PATH: 

417 env = ENV_CONFIG_PATH 

418 else: 

419 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH] 

420 

421 if prefer_environment_over_user(): 

422 paths.extend(env) 

423 paths.extend(user) 

424 else: 

425 paths.extend(user) 

426 paths.extend(env) 

427 

428 # Finally, system path 

429 if ENV_CONFIG_PATH != SYSTEM_CONFIG_PATH: 

430 paths.extend(SYSTEM_CONFIG_PATH) 

431 return paths 

432 

433 

434def exists(path: str) -> bool: 

435 """Replacement for `os.path.exists` which works for host mapped volumes 

436 on Windows containers 

437 """ 

438 try: 

439 os.lstat(path) 

440 except OSError: 

441 return False 

442 return True 

443 

444 

445def is_file_hidden_win(abs_path: str, stat_res: Optional[Any] = None) -> bool: 

446 """Is a file hidden? 

447 

448 This only checks the file itself; it should be called in combination with 

449 checking the directory containing the file. 

450 

451 Use is_hidden() instead to check the file and its parent directories. 

452 

453 Parameters 

454 ---------- 

455 abs_path : unicode 

456 The absolute path to check. 

457 stat_res : os.stat_result, optional 

458 The result of calling stat() on abs_path. If not passed, this function 

459 will call stat() internally. 

460 """ 

461 if Path(abs_path).name.startswith("."): 

462 return True 

463 

464 if stat_res is None: 

465 try: 

466 stat_res = Path(abs_path).stat() 

467 except OSError as e: 

468 if e.errno == errno.ENOENT: 

469 return False 

470 raise 

471 

472 try: 

473 if ( 

474 stat_res.st_file_attributes # type:ignore[union-attr] 

475 & stat.FILE_ATTRIBUTE_HIDDEN # type:ignore[attr-defined] 

476 ): 

477 return True 

478 except AttributeError: 

479 # allow AttributeError on PyPy for Windows 

480 # 'stat_result' object has no attribute 'st_file_attributes' 

481 # https://foss.heptapod.net/pypy/pypy/-/issues/3469 

482 warnings.warn( 

483 "hidden files are not detectable on this system, so no file will be marked as hidden.", 

484 stacklevel=2, 

485 ) 

486 

487 return False 

488 

489 

490def is_file_hidden_posix(abs_path: str, stat_res: Optional[Any] = None) -> bool: 

491 """Is a file hidden? 

492 

493 This only checks the file itself; it should be called in combination with 

494 checking the directory containing the file. 

495 

496 Use is_hidden() instead to check the file and its parent directories. 

497 

498 Parameters 

499 ---------- 

500 abs_path : unicode 

501 The absolute path to check. 

502 stat_res : os.stat_result, optional 

503 The result of calling stat() on abs_path. If not passed, this function 

504 will call stat() internally. 

505 """ 

506 if Path(abs_path).name.startswith("."): 

507 return True 

508 

509 if stat_res is None or stat.S_ISLNK(stat_res.st_mode): 

510 try: 

511 stat_res = Path(abs_path).stat() 

512 except OSError as e: 

513 if e.errno == errno.ENOENT: 

514 return False 

515 raise 

516 

517 # check that dirs can be listed 

518 if stat.S_ISDIR(stat_res.st_mode): # noqa: SIM102 

519 # use x-access, not actual listing, in case of slow/large listings 

520 if not os.access(abs_path, os.X_OK | os.R_OK): 

521 return True 

522 

523 # check UF_HIDDEN 

524 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN: 

525 return True 

526 

527 return False 

528 

529 

530if sys.platform == "win32": 

531 is_file_hidden = is_file_hidden_win 

532else: 

533 is_file_hidden = is_file_hidden_posix 

534 

535 

536def is_hidden(abs_path: str, abs_root: str = "") -> bool: 

537 """Is a file hidden or contained in a hidden directory? 

538 

539 This will start with the rightmost path element and work backwards to the 

540 given root to see if a path is hidden or in a hidden directory. Hidden is 

541 determined by either name starting with '.' or the UF_HIDDEN flag as 

542 reported by stat. 

543 

544 If abs_path is the same directory as abs_root, it will be visible even if 

545 that is a hidden folder. This only checks the visibility of files 

546 and directories *within* abs_root. 

547 

548 Parameters 

549 ---------- 

550 abs_path : unicode 

551 The absolute path to check for hidden directories. 

552 abs_root : unicode 

553 The absolute path of the root directory in which hidden directories 

554 should be checked for. 

555 """ 

556 abs_path = os.path.normpath(abs_path) 

557 abs_root = os.path.normpath(abs_root) 

558 

559 if abs_path == abs_root: 

560 return False 

561 

562 if is_file_hidden(abs_path): 

563 return True 

564 

565 if not abs_root: 

566 abs_root = abs_path.split(os.sep, 1)[0] + os.sep 

567 inside_root = abs_path[len(abs_root) :] 

568 if any(part.startswith(".") for part in Path(inside_root).parts): 

569 return True 

570 

571 # check UF_HIDDEN on any location up to root. 

572 # is_file_hidden() already checked the file, so start from its parent dir 

573 path = str(Path(abs_path).parent) 

574 while path and path.startswith(abs_root) and path != abs_root: 

575 if not Path(path).exists(): 

576 path = str(Path(path).parent) 

577 continue 

578 try: 

579 # may fail on Windows junctions 

580 st = os.lstat(path) 

581 except OSError: 

582 return True 

583 if getattr(st, "st_flags", 0) & UF_HIDDEN: 

584 return True 

585 path = str(Path(path).parent) 

586 

587 return False 

588 

589 

590def win32_restrict_file_to_user(fname: str) -> None: 

591 """Secure a windows file to read-only access for the user. 

592 Follows guidance from win32 library creator: 

593 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

594 

595 This method should be executed against an already generated file which 

596 has no secrets written to it yet. 

597 

598 Parameters 

599 ---------- 

600 

601 fname : unicode 

602 The path to the file to secure 

603 """ 

604 try: 

605 import win32api 

606 except ImportError: 

607 return _win32_restrict_file_to_user_ctypes(fname) 

608 

609 import ntsecuritycon as con 

610 import win32security 

611 

612 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone") 

613 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid) 

614 user, _domain, _type = win32security.LookupAccountName( 

615 "", win32api.GetUserNameEx(win32api.NameSamCompatible) 

616 ) 

617 

618 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION) 

619 

620 dacl = win32security.ACL() 

621 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone) 

622 dacl.AddAccessAllowedAce( 

623 win32security.ACL_REVISION, 

624 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE, 

625 user, 

626 ) 

627 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins) 

628 

629 sd.SetSecurityDescriptorDacl(1, dacl, 0) 

630 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd) 

631 return None 

632 

633 

634def _win32_restrict_file_to_user_ctypes(fname: str) -> None: 

635 """Secure a windows file to read-only access for the user. 

636 

637 Follows guidance from win32 library creator: 

638 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

639 

640 This method should be executed against an already generated file which 

641 has no secrets written to it yet. 

642 

643 Parameters 

644 ---------- 

645 

646 fname : unicode 

647 The path to the file to secure 

648 """ 

649 import ctypes 

650 from ctypes import wintypes 

651 

652 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined] 

653 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined] 

654 

655 NameSamCompatible = 2 

656 WinBuiltinAdministratorsSid = 26 

657 DACL_SECURITY_INFORMATION = 4 

658 ACL_REVISION = 2 

659 ERROR_INSUFFICIENT_BUFFER = 122 

660 ERROR_MORE_DATA = 234 

661 

662 SYNCHRONIZE = 0x100000 

663 DELETE = 0x00010000 

664 STANDARD_RIGHTS_REQUIRED = 0xF0000 

665 STANDARD_RIGHTS_READ = 0x20000 

666 STANDARD_RIGHTS_WRITE = 0x20000 

667 FILE_READ_DATA = 1 

668 FILE_READ_EA = 8 

669 FILE_READ_ATTRIBUTES = 128 

670 FILE_WRITE_DATA = 2 

671 FILE_APPEND_DATA = 4 

672 FILE_WRITE_EA = 16 

673 FILE_WRITE_ATTRIBUTES = 256 

674 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF 

675 FILE_GENERIC_READ = ( 

676 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE 

677 ) 

678 FILE_GENERIC_WRITE = ( 

679 STANDARD_RIGHTS_WRITE 

680 | FILE_WRITE_DATA 

681 | FILE_WRITE_ATTRIBUTES 

682 | FILE_WRITE_EA 

683 | FILE_APPEND_DATA 

684 | SYNCHRONIZE 

685 ) 

686 

687 class ACL(ctypes.Structure): 

688 _fields_ = [ 

689 ("AclRevision", wintypes.BYTE), 

690 ("Sbz1", wintypes.BYTE), 

691 ("AclSize", wintypes.WORD), 

692 ("AceCount", wintypes.WORD), 

693 ("Sbz2", wintypes.WORD), 

694 ] 

695 

696 PSID = ctypes.c_void_p 

697 PACL = ctypes.POINTER(ACL) 

698 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE) 

699 

700 def _nonzero_success(result: int, func: Any, args: Any) -> Any: # noqa: ARG001 

701 if not result: 

702 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined] 

703 return args 

704 

705 secur32.GetUserNameExW.errcheck = _nonzero_success 

706 secur32.GetUserNameExW.restype = wintypes.BOOL 

707 secur32.GetUserNameExW.argtypes = ( 

708 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat 

709 wintypes.LPWSTR, # LPWSTR lpNameBuffer, 

710 wintypes.PULONG, # PULONG nSize 

711 ) 

712 

713 advapi32.CreateWellKnownSid.errcheck = _nonzero_success 

714 advapi32.CreateWellKnownSid.restype = wintypes.BOOL 

715 advapi32.CreateWellKnownSid.argtypes = ( 

716 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType 

717 PSID, # PSID DomainSid 

718 PSID, # PSID pSid 

719 wintypes.PDWORD, # DWORD *cbSid 

720 ) 

721 

722 advapi32.LookupAccountNameW.errcheck = _nonzero_success 

723 advapi32.LookupAccountNameW.restype = wintypes.BOOL 

724 advapi32.LookupAccountNameW.argtypes = ( 

725 wintypes.LPWSTR, # LPCWSTR lpSystemName 

726 wintypes.LPWSTR, # LPCWSTR lpAccountName 

727 PSID, # PSID Sid 

728 wintypes.LPDWORD, # LPDWORD cbSid 

729 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName 

730 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName 

731 wintypes.LPDWORD, # PSID_NAME_USE peUse 

732 ) 

733 

734 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success 

735 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL 

736 advapi32.AddAccessAllowedAce.argtypes = ( 

737 PACL, # PACL pAcl 

738 wintypes.DWORD, # DWORD dwAceRevision 

739 wintypes.DWORD, # DWORD AccessMask 

740 PSID, # PSID pSid 

741 ) 

742 

743 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success 

744 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL 

745 advapi32.SetSecurityDescriptorDacl.argtypes = ( 

746 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

747 wintypes.BOOL, # BOOL bDaclPresent 

748 PACL, # PACL pDacl 

749 wintypes.BOOL, # BOOL bDaclDefaulted 

750 ) 

751 

752 advapi32.GetFileSecurityW.errcheck = _nonzero_success 

753 advapi32.GetFileSecurityW.restype = wintypes.BOOL 

754 advapi32.GetFileSecurityW.argtypes = ( 

755 wintypes.LPCWSTR, # LPCWSTR lpFileName 

756 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation 

757 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

758 wintypes.DWORD, # DWORD nLength 

759 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded 

760 ) 

761 

762 advapi32.SetFileSecurityW.errcheck = _nonzero_success 

763 advapi32.SetFileSecurityW.restype = wintypes.BOOL 

764 advapi32.SetFileSecurityW.argtypes = ( 

765 wintypes.LPCWSTR, # LPCWSTR lpFileName 

766 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation 

767 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

768 ) 

769 

770 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success 

771 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL 

772 advapi32.MakeAbsoluteSD.argtypes = ( 

773 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

774 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

775 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize 

776 PACL, # PACL pDacl 

777 wintypes.LPDWORD, # LPDWORD lpdwDaclSize 

778 PACL, # PACL pSacl 

779 wintypes.LPDWORD, # LPDWORD lpdwSaclSize 

780 PSID, # PSID pOwner 

781 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize 

782 PSID, # PSID pPrimaryGroup 

783 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize 

784 ) 

785 

786 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success 

787 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL 

788 advapi32.MakeSelfRelativeSD.argtypes = ( 

789 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

790 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

791 wintypes.LPDWORD, # LPDWORD lpdwBufferLength 

792 ) 

793 

794 advapi32.InitializeAcl.errcheck = _nonzero_success 

795 advapi32.InitializeAcl.restype = wintypes.BOOL 

796 advapi32.InitializeAcl.argtypes = ( 

797 PACL, # PACL pAcl, 

798 wintypes.DWORD, # DWORD nAclLength, 

799 wintypes.DWORD, # DWORD dwAclRevision 

800 ) 

801 

802 def CreateWellKnownSid(WellKnownSidType: Any) -> Any: 

803 # return a SID for predefined aliases 

804 pSid = (ctypes.c_char * 1)() 

805 cbSid = wintypes.DWORD() 

806 try: 

807 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

808 except OSError as e: 

809 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

810 raise 

811 pSid = (ctypes.c_char * cbSid.value)() 

812 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

813 return pSid[:] 

814 

815 def GetUserNameEx(NameFormat: Any) -> Any: 

816 # return the user or other security principal associated with 

817 # the calling thread 

818 nSize = ctypes.pointer(ctypes.c_ulong(0)) 

819 try: 

820 secur32.GetUserNameExW(NameFormat, None, nSize) 

821 except OSError as e: 

822 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined] 

823 raise 

824 if not nSize.contents.value: 

825 return None 

826 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value) 

827 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize) 

828 return lpNameBuffer.value 

829 

830 def LookupAccountName(lpSystemName: Any, lpAccountName: Any) -> Any: 

831 # return a security identifier (SID) for an account on a system 

832 # and the name of the domain on which the account was found 

833 cbSid = wintypes.DWORD(0) 

834 cchReferencedDomainName = wintypes.DWORD(0) 

835 peUse = wintypes.DWORD(0) 

836 try: 

837 advapi32.LookupAccountNameW( 

838 lpSystemName, 

839 lpAccountName, 

840 None, 

841 ctypes.byref(cbSid), 

842 None, 

843 ctypes.byref(cchReferencedDomainName), 

844 ctypes.byref(peUse), 

845 ) 

846 except OSError as e: 

847 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

848 raise 

849 Sid = ctypes.create_unicode_buffer("", cbSid.value) 

850 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID) 

851 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1) 

852 success = advapi32.LookupAccountNameW( 

853 lpSystemName, 

854 lpAccountName, 

855 pSid, 

856 ctypes.byref(cbSid), 

857 lpReferencedDomainName, 

858 ctypes.byref(cchReferencedDomainName), 

859 ctypes.byref(peUse), 

860 ) 

861 if not success: 

862 raise ctypes.WinError() # type:ignore[attr-defined] 

863 return pSid, lpReferencedDomainName.value, peUse.value 

864 

865 def AddAccessAllowedAce(pAcl: Any, dwAceRevision: Any, AccessMask: Any, pSid: Any) -> Any: 

866 # add an access-allowed access control entry (ACE) 

867 # to an access control list (ACL) 

868 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid) 

869 

870 def GetFileSecurity(lpFileName: Any, RequestedInformation: Any) -> Any: 

871 # return information about the security of a file or directory 

872 nLength = wintypes.DWORD(0) 

873 try: 

874 advapi32.GetFileSecurityW( 

875 lpFileName, 

876 RequestedInformation, 

877 None, 

878 0, 

879 ctypes.byref(nLength), 

880 ) 

881 except OSError as e: 

882 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

883 raise 

884 if not nLength.value: 

885 return None 

886 pSecurityDescriptor = (wintypes.BYTE * nLength.value)() 

887 advapi32.GetFileSecurityW( 

888 lpFileName, 

889 RequestedInformation, 

890 pSecurityDescriptor, 

891 nLength, 

892 ctypes.byref(nLength), 

893 ) 

894 return pSecurityDescriptor 

895 

896 def SetFileSecurity( 

897 lpFileName: Any, RequestedInformation: Any, pSecurityDescriptor: Any 

898 ) -> Any: 

899 # set the security of a file or directory object 

900 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor) 

901 

902 def SetSecurityDescriptorDacl( 

903 pSecurityDescriptor: Any, bDaclPresent: Any, pDacl: Any, bDaclDefaulted: Any 

904 ) -> Any: 

905 # set information in a discretionary access control list (DACL) 

906 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted) 

907 

908 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor: Any) -> Any: 

909 # return a security descriptor in absolute format 

910 # by using a security descriptor in self-relative format as a template 

911 pAbsoluteSecurityDescriptor = None 

912 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0) 

913 pDacl = None 

914 lpdwDaclSize = wintypes.DWORD(0) 

915 pSacl = None 

916 lpdwSaclSize = wintypes.DWORD(0) 

917 pOwner = None 

918 lpdwOwnerSize = wintypes.DWORD(0) 

919 pPrimaryGroup = None 

920 lpdwPrimaryGroupSize = wintypes.DWORD(0) 

921 try: 

922 advapi32.MakeAbsoluteSD( 

923 pSelfRelativeSecurityDescriptor, 

924 pAbsoluteSecurityDescriptor, 

925 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

926 pDacl, 

927 ctypes.byref(lpdwDaclSize), 

928 pSacl, 

929 ctypes.byref(lpdwSaclSize), 

930 pOwner, 

931 ctypes.byref(lpdwOwnerSize), 

932 pPrimaryGroup, 

933 ctypes.byref(lpdwPrimaryGroupSize), 

934 ) 

935 except OSError as e: 

936 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

937 raise 

938 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)() 

939 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)() 

940 pDacl = ctypes.cast(pDaclData, PACL).contents 

941 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)() 

942 pSacl = ctypes.cast(pSaclData, PACL).contents 

943 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)() 

944 pOwner = ctypes.cast(pOwnerData, PSID) 

945 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)() 

946 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID) 

947 advapi32.MakeAbsoluteSD( 

948 pSelfRelativeSecurityDescriptor, 

949 pAbsoluteSecurityDescriptor, 

950 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

951 pDacl, 

952 ctypes.byref(lpdwDaclSize), 

953 pSacl, 

954 ctypes.byref(lpdwSaclSize), 

955 pOwner, 

956 lpdwOwnerSize, 

957 pPrimaryGroup, 

958 ctypes.byref(lpdwPrimaryGroupSize), 

959 ) 

960 return pAbsoluteSecurityDescriptor 

961 

962 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor: Any) -> Any: 

963 # return a security descriptor in self-relative format 

964 # by using a security descriptor in absolute format as a template 

965 pSelfRelativeSecurityDescriptor = None 

966 lpdwBufferLength = wintypes.DWORD(0) 

967 try: 

968 advapi32.MakeSelfRelativeSD( 

969 pAbsoluteSecurityDescriptor, 

970 pSelfRelativeSecurityDescriptor, 

971 ctypes.byref(lpdwBufferLength), 

972 ) 

973 except OSError as e: 

974 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

975 raise 

976 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)() 

977 advapi32.MakeSelfRelativeSD( 

978 pAbsoluteSecurityDescriptor, 

979 pSelfRelativeSecurityDescriptor, 

980 ctypes.byref(lpdwBufferLength), 

981 ) 

982 return pSelfRelativeSecurityDescriptor 

983 

984 def NewAcl() -> Any: 

985 # return a new, initialized ACL (access control list) structure 

986 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ? 

987 acl_data = ctypes.create_string_buffer(nAclLength) 

988 pAcl = ctypes.cast(acl_data, PACL).contents 

989 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION) 

990 return pAcl 

991 

992 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid) 

993 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0] 

994 

995 Acl = NewAcl() 

996 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins) 

997 AddAccessAllowedAce( 

998 Acl, 

999 ACL_REVISION, 

1000 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE, 

1001 SidUser, 

1002 ) 

1003 

1004 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION) 

1005 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD) 

1006 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0) 

1007 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD) 

1008 

1009 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD) 

1010 

1011 

1012def get_file_mode(fname: str) -> int: 

1013 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner. 

1014 

1015 Parameters 

1016 ---------- 

1017 

1018 fname : unicode 

1019 The path to the file to get mode from 

1020 

1021 """ 

1022 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we 

1023 # should tolerate the execute bit on the file's owner when validating permissions - thus 

1024 # the missing least significant bit on the third octal digit. In addition, we also tolerate 

1025 # the sticky bit being set, so the lsb from the fourth octal digit is also removed. 

1026 return ( 

1027 stat.S_IMODE(Path(fname).stat().st_mode) & 0o6677 

1028 ) # Use 4 octal digits since S_IMODE does the same 

1029 

1030 

1031allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1") 

1032 

1033 

1034@contextmanager 

1035def secure_write(fname: str, binary: bool = False) -> Iterator[Any]: 

1036 """Opens a file in the most restricted pattern available for 

1037 writing content. This limits the file mode to `0o0600` and yields 

1038 the resulting opened filed handle. 

1039 

1040 Parameters 

1041 ---------- 

1042 

1043 fname : unicode 

1044 The path to the file to write 

1045 

1046 binary: boolean 

1047 Indicates that the file is binary 

1048 """ 

1049 mode = "wb" if binary else "w" 

1050 encoding = None if binary else "utf-8" 

1051 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC 

1052 try: 

1053 Path(fname).unlink() 

1054 except OSError: 

1055 # Skip any issues with the file not existing 

1056 pass 

1057 

1058 if os.name == "nt": 

1059 if allow_insecure_writes: 

1060 # Mounted file systems can have a number of failure modes inside this block. 

1061 # For windows machines in insecure mode we simply skip this to avoid failures :/ 

1062 issue_insecure_write_warning() 

1063 else: 

1064 # Python on windows does not respect the group and public bits for chmod, so we need 

1065 # to take additional steps to secure the contents. 

1066 # Touch file preemptively to avoid editing permissions in open files in Windows 

1067 fd = os.open(fname, open_flag, 0o0600) 

1068 os.close(fd) 

1069 open_flag = os.O_WRONLY | os.O_TRUNC 

1070 win32_restrict_file_to_user(fname) 

1071 

1072 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f: 

1073 if os.name != "nt": 

1074 # Enforce that the file got the requested permissions before writing 

1075 file_mode = get_file_mode(fname) 

1076 if file_mode != 0o0600: 

1077 if allow_insecure_writes: 

1078 issue_insecure_write_warning() 

1079 else: 

1080 msg = ( 

1081 f"Permissions assignment failed for secure file: '{fname}'." 

1082 f" Got '{oct(file_mode)}' instead of '0o0600'." 

1083 ) 

1084 raise RuntimeError(msg) 

1085 yield f 

1086 

1087 

1088def issue_insecure_write_warning() -> None: 

1089 """Issue an insecure write warning.""" 

1090 

1091 def format_warning(msg: str, *args: Any, **kwargs: Any) -> str: # noqa: ARG001 

1092 return str(msg) + "\n" 

1093 

1094 warnings.formatwarning = format_warning # type:ignore[assignment] 

1095 warnings.warn( 

1096 "WARNING: Insecure writes have been enabled via environment variable " 

1097 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the " 

1098 "variable or set its value to 'False'.", 

1099 stacklevel=2, 

1100 )