Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_core/paths.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

442 statements  

1"""Path utility functions.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6# Derived from IPython.utils.path, which is 

7# Copyright (c) IPython Development Team. 

8# Distributed under the terms of the Modified BSD License. 

9from __future__ import annotations 

10 

11import errno 

12import os 

13import site 

14import stat 

15import sys 

16import tempfile 

17import warnings 

18from contextlib import contextmanager 

19from pathlib import Path 

20from typing import Any, Iterator, Optional 

21 

22import platformdirs 

23 

24from .utils import deprecation 

25 

26pjoin = os.path.join 

27 

28# Capitalize Jupyter in paths only on Windows and MacOS (when not in Homebrew) 

29if sys.platform == "win32" or ( 

30 sys.platform == "darwin" and not sys.prefix.startswith("/opt/homebrew") 

31): 

32 APPNAME = "Jupyter" 

33else: 

34 APPNAME = "jupyter" 

35 

36# UF_HIDDEN is a stat flag not defined in the stat module. 

37# It is used by BSD to indicate hidden files. 

38UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768) 

39 

40 

41def envset(name: str, default: Optional[bool] = False) -> Optional[bool]: 

42 """Return the boolean value of a given environment variable. 

43 

44 An environment variable is considered set if it is assigned to a value 

45 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive) 

46 

47 If the environment variable is not defined, the default value is returned. 

48 """ 

49 if name not in os.environ: 

50 return default 

51 

52 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"] 

53 

54 

55def use_platform_dirs() -> bool: 

56 """Determine if platformdirs should be used for system-specific paths. 

57 

58 We plan for this to default to False in jupyter_core version 5 and to True 

59 in jupyter_core version 6. 

60 """ 

61 return envset("JUPYTER_PLATFORM_DIRS", False) # type:ignore[return-value] 

62 

63 

64def get_home_dir() -> str: 

65 """Get the real path of the home directory""" 

66 homedir = Path("~").expanduser() 

67 # Next line will make things work even when /home/ is a symlink to 

68 # /usr/home as it is on FreeBSD, for example 

69 return str(Path(homedir).resolve()) 

70 

71 

72_dtemps: dict[str, str] = {} 

73 

74 

75def _do_i_own(path: str) -> bool: 

76 """Return whether the current user owns the given path""" 

77 p = Path(path).resolve() 

78 

79 # walk up to first existing parent 

80 while not p.exists() and p != p.parent: 

81 p = p.parent 

82 

83 # simplest check: owner by name 

84 # not always implemented or available 

85 try: 

86 return p.owner() == os.getlogin() 

87 except Exception: # noqa: S110 

88 pass 

89 

90 if hasattr(os, "geteuid"): 

91 try: 

92 st = p.stat() 

93 return st.st_uid == os.geteuid() 

94 except (NotImplementedError, OSError): 

95 # geteuid not always implemented 

96 pass 

97 

98 # no ownership checks worked, check write access 

99 return os.access(p, os.W_OK) 

100 

101 

102def prefer_environment_over_user() -> bool: 

103 """Determine if environment-level paths should take precedence over user-level paths.""" 

104 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value 

105 if "JUPYTER_PREFER_ENV_PATH" in os.environ: 

106 return envset("JUPYTER_PREFER_ENV_PATH") # type:ignore[return-value] 

107 

108 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def) 

109 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix): 

110 return True 

111 

112 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True 

113 if ( 

114 "CONDA_PREFIX" in os.environ 

115 and sys.prefix.startswith(os.environ["CONDA_PREFIX"]) 

116 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base" 

117 and _do_i_own(sys.prefix) 

118 ): 

119 return True 

120 

121 return False 

122 

123 

124def _mkdtemp_once(name: str) -> str: 

125 """Make or reuse a temporary directory. 

126 

127 If this is called with the same name in the same process, it will return 

128 the same directory. 

129 """ 

130 try: 

131 return _dtemps[name] 

132 except KeyError: 

133 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-") 

134 return d 

135 

136 

137def jupyter_config_dir() -> str: 

138 """Get the Jupyter config directory for this platform and user. 

139 

140 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate 

141 directory for the platform. 

142 """ 

143 

144 env = os.environ 

145 if env.get("JUPYTER_NO_CONFIG"): 

146 return _mkdtemp_once("jupyter-clean-cfg") 

147 

148 if env.get("JUPYTER_CONFIG_DIR"): 

149 return env["JUPYTER_CONFIG_DIR"] 

150 

151 if use_platform_dirs(): 

152 return platformdirs.user_config_dir(APPNAME, appauthor=False) 

153 

154 home_dir = get_home_dir() 

155 return pjoin(home_dir, ".jupyter") 

156 

157 

158def jupyter_data_dir() -> str: 

159 """Get the config directory for Jupyter data files for this platform and user. 

160 

161 These are non-transient, non-configuration files. 

162 

163 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path. 

164 """ 

165 env = os.environ 

166 

167 if env.get("JUPYTER_DATA_DIR"): 

168 return env["JUPYTER_DATA_DIR"] 

169 

170 if use_platform_dirs(): 

171 return platformdirs.user_data_dir(APPNAME, appauthor=False) 

172 

173 home = get_home_dir() 

174 

175 if sys.platform == "darwin": 

176 return str(Path(home, "Library", "Jupyter")) 

177 if sys.platform == "win32": 

178 appdata = os.environ.get("APPDATA", None) 

179 if appdata: 

180 return str(Path(appdata, "jupyter").resolve()) 

181 return pjoin(jupyter_config_dir(), "data") 

182 # Linux, non-OS X Unix, AIX, etc. 

183 xdg = env.get("XDG_DATA_HOME", None) 

184 if not xdg: 

185 xdg = pjoin(home, ".local", "share") 

186 return pjoin(xdg, "jupyter") 

187 

188 

189def jupyter_runtime_dir() -> str: 

190 """Return the runtime dir for transient jupyter files. 

191 

192 Returns JUPYTER_RUNTIME_DIR if defined. 

193 

194 The default is now (data_dir)/runtime on all platforms; 

195 we no longer use XDG_RUNTIME_DIR after various problems. 

196 """ 

197 env = os.environ 

198 

199 if env.get("JUPYTER_RUNTIME_DIR"): 

200 return env["JUPYTER_RUNTIME_DIR"] 

201 

202 return pjoin(jupyter_data_dir(), "runtime") 

203 

204 

205if use_platform_dirs(): 

206 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir( 

207 APPNAME, appauthor=False, multipath=True 

208 ).split(os.pathsep) 

209else: 

210 deprecation( 

211 "Jupyter is migrating its paths to use standard platformdirs\n" 

212 "given by the platformdirs library. To remove this warning and\n" 

213 "see the appropriate new directories, set the environment variable\n" 

214 "`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.\n" 

215 "The use of platformdirs will be the default in `jupyter_core` v6" 

216 ) 

217 if os.name == "nt": 

218 programdata = os.environ.get("PROGRAMDATA", None) 

219 if programdata: 

220 SYSTEM_JUPYTER_PATH = [pjoin(programdata, "jupyter")] 

221 else: # PROGRAMDATA is not defined by default on XP. 

222 SYSTEM_JUPYTER_PATH = [str(Path(sys.prefix, "share", "jupyter"))] 

223 else: 

224 SYSTEM_JUPYTER_PATH = [ 

225 "/usr/local/share/jupyter", 

226 "/usr/share/jupyter", 

227 ] 

228 

229ENV_JUPYTER_PATH: list[str] = [str(Path(sys.prefix, "share", "jupyter"))] 

230 

231 

232def jupyter_path(*subdirs: str) -> list[str]: 

233 """Return a list of directories to search for data files 

234 

235 JUPYTER_PATH environment variable has highest priority. 

236 

237 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level 

238 directories will have priority over user-level directories. 

239 

240 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

241 appropriate Python user site subdirectory to the user-level directories. 

242 

243 

244 If ``*subdirs`` are given, that subdirectory will be added to each element. 

245 

246 Examples: 

247 

248 >>> jupyter_path() 

249 ['~/.local/jupyter', '/usr/local/share/jupyter'] 

250 >>> jupyter_path('kernels') 

251 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels'] 

252 """ 

253 

254 paths: list[str] = [] 

255 

256 # highest priority is explicit environment variable 

257 if os.environ.get("JUPYTER_PATH"): 

258 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep)) 

259 

260 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

261 user = [jupyter_data_dir()] 

262 if site.ENABLE_USER_SITE: 

263 # Check if site.getuserbase() exists to be compatible with virtualenv, 

264 # which often does not have this method. 

265 userbase: Optional[str] 

266 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

267 

268 if userbase: 

269 userdir = str(Path(userbase, "share", "jupyter")) 

270 if userdir not in user: 

271 user.append(userdir) 

272 

273 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH] 

274 

275 if prefer_environment_over_user(): 

276 paths.extend(env) 

277 paths.extend(user) 

278 else: 

279 paths.extend(user) 

280 paths.extend(env) 

281 

282 # finally, system 

283 paths.extend(SYSTEM_JUPYTER_PATH) 

284 

285 # add subdir, if requested 

286 if subdirs: 

287 paths = [pjoin(p, *subdirs) for p in paths] 

288 return paths 

289 

290 

291if use_platform_dirs(): 

292 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir( 

293 APPNAME, appauthor=False, multipath=True 

294 ).split(os.pathsep) 

295else: 

296 if os.name == "nt": 

297 programdata = os.environ.get("PROGRAMDATA", None) 

298 if programdata: # noqa: SIM108 

299 SYSTEM_CONFIG_PATH = [str(Path(programdata, "jupyter"))] 

300 else: # PROGRAMDATA is not defined by default on XP. 

301 SYSTEM_CONFIG_PATH = [] 

302 else: 

303 SYSTEM_CONFIG_PATH = [ 

304 "/usr/local/etc/jupyter", 

305 "/etc/jupyter", 

306 ] 

307ENV_CONFIG_PATH: list[str] = [str(Path(sys.prefix, "etc", "jupyter"))] 

308 

309 

310def jupyter_config_path() -> list[str]: 

311 """Return the search path for Jupyter config files as a list. 

312 

313 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the 

314 environment-level directories will have priority over user-level 

315 directories. 

316 

317 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

318 appropriate Python user site subdirectory to the user-level directories. 

319 """ 

320 if os.environ.get("JUPYTER_NO_CONFIG"): 

321 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set. 

322 return [jupyter_config_dir()] 

323 

324 paths: list[str] = [] 

325 

326 # highest priority is explicit environment variable 

327 if os.environ.get("JUPYTER_CONFIG_PATH"): 

328 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep)) 

329 

330 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

331 user = [jupyter_config_dir()] 

332 if site.ENABLE_USER_SITE: 

333 userbase: Optional[str] 

334 # Check if site.getuserbase() exists to be compatible with virtualenv, 

335 # which often does not have this method. 

336 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

337 

338 if userbase: 

339 userdir = str(Path(userbase, "etc", "jupyter")) 

340 if userdir not in user: 

341 user.append(userdir) 

342 

343 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH] 

344 

345 if prefer_environment_over_user(): 

346 paths.extend(env) 

347 paths.extend(user) 

348 else: 

349 paths.extend(user) 

350 paths.extend(env) 

351 

352 # Finally, system path 

353 paths.extend(SYSTEM_CONFIG_PATH) 

354 return paths 

355 

356 

357def exists(path: str) -> bool: 

358 """Replacement for `os.path.exists` which works for host mapped volumes 

359 on Windows containers 

360 """ 

361 try: 

362 os.lstat(path) 

363 except OSError: 

364 return False 

365 return True 

366 

367 

368def is_file_hidden_win(abs_path: str, stat_res: Optional[Any] = None) -> bool: 

369 """Is a file hidden? 

370 

371 This only checks the file itself; it should be called in combination with 

372 checking the directory containing the file. 

373 

374 Use is_hidden() instead to check the file and its parent directories. 

375 

376 Parameters 

377 ---------- 

378 abs_path : unicode 

379 The absolute path to check. 

380 stat_res : os.stat_result, optional 

381 The result of calling stat() on abs_path. If not passed, this function 

382 will call stat() internally. 

383 """ 

384 if Path(abs_path).name.startswith("."): 

385 return True 

386 

387 if stat_res is None: 

388 try: 

389 stat_res = Path(abs_path).stat() 

390 except OSError as e: 

391 if e.errno == errno.ENOENT: 

392 return False 

393 raise 

394 

395 try: 

396 if ( 

397 stat_res.st_file_attributes # type:ignore[union-attr] 

398 & stat.FILE_ATTRIBUTE_HIDDEN # type:ignore[attr-defined] 

399 ): 

400 return True 

401 except AttributeError: 

402 # allow AttributeError on PyPy for Windows 

403 # 'stat_result' object has no attribute 'st_file_attributes' 

404 # https://foss.heptapod.net/pypy/pypy/-/issues/3469 

405 warnings.warn( 

406 "hidden files are not detectable on this system, so no file will be marked as hidden.", 

407 stacklevel=2, 

408 ) 

409 

410 return False 

411 

412 

413def is_file_hidden_posix(abs_path: str, stat_res: Optional[Any] = None) -> bool: 

414 """Is a file hidden? 

415 

416 This only checks the file itself; it should be called in combination with 

417 checking the directory containing the file. 

418 

419 Use is_hidden() instead to check the file and its parent directories. 

420 

421 Parameters 

422 ---------- 

423 abs_path : unicode 

424 The absolute path to check. 

425 stat_res : os.stat_result, optional 

426 The result of calling stat() on abs_path. If not passed, this function 

427 will call stat() internally. 

428 """ 

429 if Path(abs_path).name.startswith("."): 

430 return True 

431 

432 if stat_res is None or stat.S_ISLNK(stat_res.st_mode): 

433 try: 

434 stat_res = Path(abs_path).stat() 

435 except OSError as e: 

436 if e.errno == errno.ENOENT: 

437 return False 

438 raise 

439 

440 # check that dirs can be listed 

441 if stat.S_ISDIR(stat_res.st_mode): # noqa: SIM102 

442 # use x-access, not actual listing, in case of slow/large listings 

443 if not os.access(abs_path, os.X_OK | os.R_OK): 

444 return True 

445 

446 # check UF_HIDDEN 

447 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN: 

448 return True 

449 

450 return False 

451 

452 

453if sys.platform == "win32": 

454 is_file_hidden = is_file_hidden_win 

455else: 

456 is_file_hidden = is_file_hidden_posix 

457 

458 

459def is_hidden(abs_path: str, abs_root: str = "") -> bool: 

460 """Is a file hidden or contained in a hidden directory? 

461 

462 This will start with the rightmost path element and work backwards to the 

463 given root to see if a path is hidden or in a hidden directory. Hidden is 

464 determined by either name starting with '.' or the UF_HIDDEN flag as 

465 reported by stat. 

466 

467 If abs_path is the same directory as abs_root, it will be visible even if 

468 that is a hidden folder. This only checks the visibility of files 

469 and directories *within* abs_root. 

470 

471 Parameters 

472 ---------- 

473 abs_path : unicode 

474 The absolute path to check for hidden directories. 

475 abs_root : unicode 

476 The absolute path of the root directory in which hidden directories 

477 should be checked for. 

478 """ 

479 abs_path = os.path.normpath(abs_path) 

480 abs_root = os.path.normpath(abs_root) 

481 

482 if abs_path == abs_root: 

483 return False 

484 

485 if is_file_hidden(abs_path): 

486 return True 

487 

488 if not abs_root: 

489 abs_root = abs_path.split(os.sep, 1)[0] + os.sep 

490 inside_root = abs_path[len(abs_root) :] 

491 if any(part.startswith(".") for part in Path(inside_root).parts): 

492 return True 

493 

494 # check UF_HIDDEN on any location up to root. 

495 # is_file_hidden() already checked the file, so start from its parent dir 

496 path = str(Path(abs_path).parent) 

497 while path and path.startswith(abs_root) and path != abs_root: 

498 if not Path(path).exists(): 

499 path = str(Path(path).parent) 

500 continue 

501 try: 

502 # may fail on Windows junctions 

503 st = os.lstat(path) 

504 except OSError: 

505 return True 

506 if getattr(st, "st_flags", 0) & UF_HIDDEN: 

507 return True 

508 path = str(Path(path).parent) 

509 

510 return False 

511 

512 

513def win32_restrict_file_to_user(fname: str) -> None: 

514 """Secure a windows file to read-only access for the user. 

515 Follows guidance from win32 library creator: 

516 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

517 

518 This method should be executed against an already generated file which 

519 has no secrets written to it yet. 

520 

521 Parameters 

522 ---------- 

523 

524 fname : unicode 

525 The path to the file to secure 

526 """ 

527 try: 

528 import win32api 

529 except ImportError: 

530 return _win32_restrict_file_to_user_ctypes(fname) 

531 

532 import ntsecuritycon as con 

533 import win32security 

534 

535 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone") 

536 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid) 

537 user, _domain, _type = win32security.LookupAccountName( 

538 "", win32api.GetUserNameEx(win32api.NameSamCompatible) 

539 ) 

540 

541 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION) 

542 

543 dacl = win32security.ACL() 

544 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone) 

545 dacl.AddAccessAllowedAce( 

546 win32security.ACL_REVISION, 

547 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE, 

548 user, 

549 ) 

550 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins) 

551 

552 sd.SetSecurityDescriptorDacl(1, dacl, 0) 

553 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd) 

554 return None 

555 

556 

557def _win32_restrict_file_to_user_ctypes(fname: str) -> None: 

558 """Secure a windows file to read-only access for the user. 

559 

560 Follows guidance from win32 library creator: 

561 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

562 

563 This method should be executed against an already generated file which 

564 has no secrets written to it yet. 

565 

566 Parameters 

567 ---------- 

568 

569 fname : unicode 

570 The path to the file to secure 

571 """ 

572 import ctypes 

573 from ctypes import wintypes 

574 

575 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined] 

576 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined] 

577 

578 NameSamCompatible = 2 

579 WinBuiltinAdministratorsSid = 26 

580 DACL_SECURITY_INFORMATION = 4 

581 ACL_REVISION = 2 

582 ERROR_INSUFFICIENT_BUFFER = 122 

583 ERROR_MORE_DATA = 234 

584 

585 SYNCHRONIZE = 0x100000 

586 DELETE = 0x00010000 

587 STANDARD_RIGHTS_REQUIRED = 0xF0000 

588 STANDARD_RIGHTS_READ = 0x20000 

589 STANDARD_RIGHTS_WRITE = 0x20000 

590 FILE_READ_DATA = 1 

591 FILE_READ_EA = 8 

592 FILE_READ_ATTRIBUTES = 128 

593 FILE_WRITE_DATA = 2 

594 FILE_APPEND_DATA = 4 

595 FILE_WRITE_EA = 16 

596 FILE_WRITE_ATTRIBUTES = 256 

597 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF 

598 FILE_GENERIC_READ = ( 

599 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE 

600 ) 

601 FILE_GENERIC_WRITE = ( 

602 STANDARD_RIGHTS_WRITE 

603 | FILE_WRITE_DATA 

604 | FILE_WRITE_ATTRIBUTES 

605 | FILE_WRITE_EA 

606 | FILE_APPEND_DATA 

607 | SYNCHRONIZE 

608 ) 

609 

610 class ACL(ctypes.Structure): 

611 _fields_ = [ 

612 ("AclRevision", wintypes.BYTE), 

613 ("Sbz1", wintypes.BYTE), 

614 ("AclSize", wintypes.WORD), 

615 ("AceCount", wintypes.WORD), 

616 ("Sbz2", wintypes.WORD), 

617 ] 

618 

619 PSID = ctypes.c_void_p 

620 PACL = ctypes.POINTER(ACL) 

621 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE) 

622 

623 def _nonzero_success(result: int, func: Any, args: Any) -> Any: # noqa: ARG001 

624 if not result: 

625 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined] 

626 return args 

627 

628 secur32.GetUserNameExW.errcheck = _nonzero_success 

629 secur32.GetUserNameExW.restype = wintypes.BOOL 

630 secur32.GetUserNameExW.argtypes = ( 

631 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat 

632 wintypes.LPWSTR, # LPWSTR lpNameBuffer, 

633 wintypes.PULONG, # PULONG nSize 

634 ) 

635 

636 advapi32.CreateWellKnownSid.errcheck = _nonzero_success 

637 advapi32.CreateWellKnownSid.restype = wintypes.BOOL 

638 advapi32.CreateWellKnownSid.argtypes = ( 

639 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType 

640 PSID, # PSID DomainSid 

641 PSID, # PSID pSid 

642 wintypes.PDWORD, # DWORD *cbSid 

643 ) 

644 

645 advapi32.LookupAccountNameW.errcheck = _nonzero_success 

646 advapi32.LookupAccountNameW.restype = wintypes.BOOL 

647 advapi32.LookupAccountNameW.argtypes = ( 

648 wintypes.LPWSTR, # LPCWSTR lpSystemName 

649 wintypes.LPWSTR, # LPCWSTR lpAccountName 

650 PSID, # PSID Sid 

651 wintypes.LPDWORD, # LPDWORD cbSid 

652 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName 

653 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName 

654 wintypes.LPDWORD, # PSID_NAME_USE peUse 

655 ) 

656 

657 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success 

658 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL 

659 advapi32.AddAccessAllowedAce.argtypes = ( 

660 PACL, # PACL pAcl 

661 wintypes.DWORD, # DWORD dwAceRevision 

662 wintypes.DWORD, # DWORD AccessMask 

663 PSID, # PSID pSid 

664 ) 

665 

666 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success 

667 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL 

668 advapi32.SetSecurityDescriptorDacl.argtypes = ( 

669 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

670 wintypes.BOOL, # BOOL bDaclPresent 

671 PACL, # PACL pDacl 

672 wintypes.BOOL, # BOOL bDaclDefaulted 

673 ) 

674 

675 advapi32.GetFileSecurityW.errcheck = _nonzero_success 

676 advapi32.GetFileSecurityW.restype = wintypes.BOOL 

677 advapi32.GetFileSecurityW.argtypes = ( 

678 wintypes.LPCWSTR, # LPCWSTR lpFileName 

679 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation 

680 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

681 wintypes.DWORD, # DWORD nLength 

682 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded 

683 ) 

684 

685 advapi32.SetFileSecurityW.errcheck = _nonzero_success 

686 advapi32.SetFileSecurityW.restype = wintypes.BOOL 

687 advapi32.SetFileSecurityW.argtypes = ( 

688 wintypes.LPCWSTR, # LPCWSTR lpFileName 

689 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation 

690 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

691 ) 

692 

693 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success 

694 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL 

695 advapi32.MakeAbsoluteSD.argtypes = ( 

696 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

697 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

698 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize 

699 PACL, # PACL pDacl 

700 wintypes.LPDWORD, # LPDWORD lpdwDaclSize 

701 PACL, # PACL pSacl 

702 wintypes.LPDWORD, # LPDWORD lpdwSaclSize 

703 PSID, # PSID pOwner 

704 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize 

705 PSID, # PSID pPrimaryGroup 

706 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize 

707 ) 

708 

709 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success 

710 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL 

711 advapi32.MakeSelfRelativeSD.argtypes = ( 

712 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

713 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

714 wintypes.LPDWORD, # LPDWORD lpdwBufferLength 

715 ) 

716 

717 advapi32.InitializeAcl.errcheck = _nonzero_success 

718 advapi32.InitializeAcl.restype = wintypes.BOOL 

719 advapi32.InitializeAcl.argtypes = ( 

720 PACL, # PACL pAcl, 

721 wintypes.DWORD, # DWORD nAclLength, 

722 wintypes.DWORD, # DWORD dwAclRevision 

723 ) 

724 

725 def CreateWellKnownSid(WellKnownSidType: Any) -> Any: 

726 # return a SID for predefined aliases 

727 pSid = (ctypes.c_char * 1)() 

728 cbSid = wintypes.DWORD() 

729 try: 

730 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

731 except OSError as e: 

732 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

733 raise 

734 pSid = (ctypes.c_char * cbSid.value)() 

735 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

736 return pSid[:] 

737 

738 def GetUserNameEx(NameFormat: Any) -> Any: 

739 # return the user or other security principal associated with 

740 # the calling thread 

741 nSize = ctypes.pointer(ctypes.c_ulong(0)) 

742 try: 

743 secur32.GetUserNameExW(NameFormat, None, nSize) 

744 except OSError as e: 

745 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined] 

746 raise 

747 if not nSize.contents.value: 

748 return None 

749 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value) 

750 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize) 

751 return lpNameBuffer.value 

752 

753 def LookupAccountName(lpSystemName: Any, lpAccountName: Any) -> Any: 

754 # return a security identifier (SID) for an account on a system 

755 # and the name of the domain on which the account was found 

756 cbSid = wintypes.DWORD(0) 

757 cchReferencedDomainName = wintypes.DWORD(0) 

758 peUse = wintypes.DWORD(0) 

759 try: 

760 advapi32.LookupAccountNameW( 

761 lpSystemName, 

762 lpAccountName, 

763 None, 

764 ctypes.byref(cbSid), 

765 None, 

766 ctypes.byref(cchReferencedDomainName), 

767 ctypes.byref(peUse), 

768 ) 

769 except OSError as e: 

770 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

771 raise 

772 Sid = ctypes.create_unicode_buffer("", cbSid.value) 

773 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID) 

774 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1) 

775 success = advapi32.LookupAccountNameW( 

776 lpSystemName, 

777 lpAccountName, 

778 pSid, 

779 ctypes.byref(cbSid), 

780 lpReferencedDomainName, 

781 ctypes.byref(cchReferencedDomainName), 

782 ctypes.byref(peUse), 

783 ) 

784 if not success: 

785 raise ctypes.WinError() # type:ignore[attr-defined] 

786 return pSid, lpReferencedDomainName.value, peUse.value 

787 

788 def AddAccessAllowedAce(pAcl: Any, dwAceRevision: Any, AccessMask: Any, pSid: Any) -> Any: 

789 # add an access-allowed access control entry (ACE) 

790 # to an access control list (ACL) 

791 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid) 

792 

793 def GetFileSecurity(lpFileName: Any, RequestedInformation: Any) -> Any: 

794 # return information about the security of a file or directory 

795 nLength = wintypes.DWORD(0) 

796 try: 

797 advapi32.GetFileSecurityW( 

798 lpFileName, 

799 RequestedInformation, 

800 None, 

801 0, 

802 ctypes.byref(nLength), 

803 ) 

804 except OSError as e: 

805 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

806 raise 

807 if not nLength.value: 

808 return None 

809 pSecurityDescriptor = (wintypes.BYTE * nLength.value)() 

810 advapi32.GetFileSecurityW( 

811 lpFileName, 

812 RequestedInformation, 

813 pSecurityDescriptor, 

814 nLength, 

815 ctypes.byref(nLength), 

816 ) 

817 return pSecurityDescriptor 

818 

819 def SetFileSecurity( 

820 lpFileName: Any, RequestedInformation: Any, pSecurityDescriptor: Any 

821 ) -> Any: 

822 # set the security of a file or directory object 

823 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor) 

824 

825 def SetSecurityDescriptorDacl( 

826 pSecurityDescriptor: Any, bDaclPresent: Any, pDacl: Any, bDaclDefaulted: Any 

827 ) -> Any: 

828 # set information in a discretionary access control list (DACL) 

829 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted) 

830 

831 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor: Any) -> Any: 

832 # return a security descriptor in absolute format 

833 # by using a security descriptor in self-relative format as a template 

834 pAbsoluteSecurityDescriptor = None 

835 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0) 

836 pDacl = None 

837 lpdwDaclSize = wintypes.DWORD(0) 

838 pSacl = None 

839 lpdwSaclSize = wintypes.DWORD(0) 

840 pOwner = None 

841 lpdwOwnerSize = wintypes.DWORD(0) 

842 pPrimaryGroup = None 

843 lpdwPrimaryGroupSize = wintypes.DWORD(0) 

844 try: 

845 advapi32.MakeAbsoluteSD( 

846 pSelfRelativeSecurityDescriptor, 

847 pAbsoluteSecurityDescriptor, 

848 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

849 pDacl, 

850 ctypes.byref(lpdwDaclSize), 

851 pSacl, 

852 ctypes.byref(lpdwSaclSize), 

853 pOwner, 

854 ctypes.byref(lpdwOwnerSize), 

855 pPrimaryGroup, 

856 ctypes.byref(lpdwPrimaryGroupSize), 

857 ) 

858 except OSError as e: 

859 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

860 raise 

861 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)() 

862 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)() 

863 pDacl = ctypes.cast(pDaclData, PACL).contents 

864 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)() 

865 pSacl = ctypes.cast(pSaclData, PACL).contents 

866 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)() 

867 pOwner = ctypes.cast(pOwnerData, PSID) 

868 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)() 

869 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID) 

870 advapi32.MakeAbsoluteSD( 

871 pSelfRelativeSecurityDescriptor, 

872 pAbsoluteSecurityDescriptor, 

873 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

874 pDacl, 

875 ctypes.byref(lpdwDaclSize), 

876 pSacl, 

877 ctypes.byref(lpdwSaclSize), 

878 pOwner, 

879 lpdwOwnerSize, 

880 pPrimaryGroup, 

881 ctypes.byref(lpdwPrimaryGroupSize), 

882 ) 

883 return pAbsoluteSecurityDescriptor 

884 

885 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor: Any) -> Any: 

886 # return a security descriptor in self-relative format 

887 # by using a security descriptor in absolute format as a template 

888 pSelfRelativeSecurityDescriptor = None 

889 lpdwBufferLength = wintypes.DWORD(0) 

890 try: 

891 advapi32.MakeSelfRelativeSD( 

892 pAbsoluteSecurityDescriptor, 

893 pSelfRelativeSecurityDescriptor, 

894 ctypes.byref(lpdwBufferLength), 

895 ) 

896 except OSError as e: 

897 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

898 raise 

899 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)() 

900 advapi32.MakeSelfRelativeSD( 

901 pAbsoluteSecurityDescriptor, 

902 pSelfRelativeSecurityDescriptor, 

903 ctypes.byref(lpdwBufferLength), 

904 ) 

905 return pSelfRelativeSecurityDescriptor 

906 

907 def NewAcl() -> Any: 

908 # return a new, initialized ACL (access control list) structure 

909 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ? 

910 acl_data = ctypes.create_string_buffer(nAclLength) 

911 pAcl = ctypes.cast(acl_data, PACL).contents 

912 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION) 

913 return pAcl 

914 

915 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid) 

916 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0] 

917 

918 Acl = NewAcl() 

919 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins) 

920 AddAccessAllowedAce( 

921 Acl, 

922 ACL_REVISION, 

923 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE, 

924 SidUser, 

925 ) 

926 

927 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION) 

928 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD) 

929 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0) 

930 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD) 

931 

932 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD) 

933 

934 

935def get_file_mode(fname: str) -> int: 

936 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner. 

937 

938 Parameters 

939 ---------- 

940 

941 fname : unicode 

942 The path to the file to get mode from 

943 

944 """ 

945 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we 

946 # should tolerate the execute bit on the file's owner when validating permissions - thus 

947 # the missing least significant bit on the third octal digit. In addition, we also tolerate 

948 # the sticky bit being set, so the lsb from the fourth octal digit is also removed. 

949 return ( 

950 stat.S_IMODE(Path(fname).stat().st_mode) & 0o6677 

951 ) # Use 4 octal digits since S_IMODE does the same 

952 

953 

954allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1") 

955 

956 

957@contextmanager 

958def secure_write(fname: str, binary: bool = False) -> Iterator[Any]: 

959 """Opens a file in the most restricted pattern available for 

960 writing content. This limits the file mode to `0o0600` and yields 

961 the resulting opened filed handle. 

962 

963 Parameters 

964 ---------- 

965 

966 fname : unicode 

967 The path to the file to write 

968 

969 binary: boolean 

970 Indicates that the file is binary 

971 """ 

972 mode = "wb" if binary else "w" 

973 encoding = None if binary else "utf-8" 

974 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC 

975 try: 

976 Path(fname).unlink() 

977 except OSError: 

978 # Skip any issues with the file not existing 

979 pass 

980 

981 if os.name == "nt": 

982 if allow_insecure_writes: 

983 # Mounted file systems can have a number of failure modes inside this block. 

984 # For windows machines in insecure mode we simply skip this to avoid failures :/ 

985 issue_insecure_write_warning() 

986 else: 

987 # Python on windows does not respect the group and public bits for chmod, so we need 

988 # to take additional steps to secure the contents. 

989 # Touch file pre-emptively to avoid editing permissions in open files in Windows 

990 fd = os.open(fname, open_flag, 0o0600) 

991 os.close(fd) 

992 open_flag = os.O_WRONLY | os.O_TRUNC 

993 win32_restrict_file_to_user(fname) 

994 

995 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f: 

996 if os.name != "nt": 

997 # Enforce that the file got the requested permissions before writing 

998 file_mode = get_file_mode(fname) 

999 if file_mode != 0o0600: 

1000 if allow_insecure_writes: 

1001 issue_insecure_write_warning() 

1002 else: 

1003 msg = ( 

1004 f"Permissions assignment failed for secure file: '{fname}'." 

1005 f" Got '{oct(file_mode)}' instead of '0o0600'." 

1006 ) 

1007 raise RuntimeError(msg) 

1008 yield f 

1009 

1010 

1011def issue_insecure_write_warning() -> None: 

1012 """Issue an insecure write warning.""" 

1013 

1014 def format_warning(msg: str, *args: Any, **kwargs: Any) -> str: # noqa: ARG001 

1015 return str(msg) + "\n" 

1016 

1017 warnings.formatwarning = format_warning # type:ignore[assignment] 

1018 warnings.warn( 

1019 "WARNING: Insecure writes have been enabled via environment variable " 

1020 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the " 

1021 "variable or set its value to 'False'.", 

1022 stacklevel=2, 

1023 )