Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_core/paths.py: 12%

442 statements  

« prev     ^ index     » next       coverage.py v7.3.3, created at 2023-12-15 06:13 +0000

1"""Path utility functions.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6# Derived from IPython.utils.path, which is 

7# Copyright (c) IPython Development Team. 

8# Distributed under the terms of the Modified BSD License. 

9 

10 

11import errno 

12import os 

13import site 

14import stat 

15import sys 

16import tempfile 

17import warnings 

18from contextlib import contextmanager 

19from pathlib import Path 

20from typing import Any, Dict, Iterator, List, Optional 

21 

22import platformdirs 

23 

24from .utils import deprecation 

25 

26pjoin = os.path.join 

27 

28# Capitalize Jupyter in paths only on Windows and MacOS (when not in Homebrew) 

29if sys.platform == "win32" or ( 

30 sys.platform == "darwin" and not sys.prefix.startswith("/opt/homebrew") 

31): 

32 APPNAME = "Jupyter" 

33else: 

34 APPNAME = "jupyter" 

35 

36# UF_HIDDEN is a stat flag not defined in the stat module. 

37# It is used by BSD to indicate hidden files. 

38UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768) 

39 

40 

41def envset(name: str, default: Optional[bool] = False) -> Optional[bool]: 

42 """Return the boolean value of a given environment variable. 

43 

44 An environment variable is considered set if it is assigned to a value 

45 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive) 

46 

47 If the environment variable is not defined, the default value is returned. 

48 """ 

49 if name not in os.environ: 

50 return default 

51 

52 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"] 

53 

54 

55def use_platform_dirs() -> bool: 

56 """Determine if platformdirs should be used for system-specific paths. 

57 

58 We plan for this to default to False in jupyter_core version 5 and to True 

59 in jupyter_core version 6. 

60 """ 

61 return envset("JUPYTER_PLATFORM_DIRS", False) # type:ignore[return-value] 

62 

63 

64def get_home_dir() -> str: 

65 """Get the real path of the home directory""" 

66 homedir = os.path.expanduser("~") 

67 # Next line will make things work even when /home/ is a symlink to 

68 # /usr/home as it is on FreeBSD, for example 

69 homedir = str(Path(homedir).resolve()) 

70 return homedir 

71 

72 

73_dtemps: Dict[str, str] = {} 

74 

75 

76def _do_i_own(path: str) -> bool: 

77 """Return whether the current user owns the given path""" 

78 p = Path(path).resolve() 

79 

80 # walk up to first existing parent 

81 while not p.exists() and p != p.parent: 

82 p = p.parent 

83 

84 # simplest check: owner by name 

85 # not always implemented or available 

86 try: 

87 return p.owner() == os.getlogin() 

88 except Exception: # noqa 

89 pass 

90 

91 if hasattr(os, "geteuid"): 

92 try: 

93 st = p.stat() 

94 return st.st_uid == os.geteuid() 

95 except (NotImplementedError, OSError): 

96 # geteuid not always implemented 

97 pass 

98 

99 # no ownership checks worked, check write access 

100 return os.access(p, os.W_OK) 

101 

102 

103def prefer_environment_over_user() -> bool: 

104 """Determine if environment-level paths should take precedence over user-level paths.""" 

105 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value 

106 if "JUPYTER_PREFER_ENV_PATH" in os.environ: 

107 return envset("JUPYTER_PREFER_ENV_PATH") # type:ignore[return-value] 

108 

109 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def) 

110 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix): 

111 return True 

112 

113 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True 

114 if ( 

115 "CONDA_PREFIX" in os.environ 

116 and sys.prefix.startswith(os.environ["CONDA_PREFIX"]) 

117 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base" 

118 and _do_i_own(sys.prefix) 

119 ): 

120 return True 

121 

122 return False 

123 

124 

125def _mkdtemp_once(name: str) -> str: 

126 """Make or reuse a temporary directory. 

127 

128 If this is called with the same name in the same process, it will return 

129 the same directory. 

130 """ 

131 try: 

132 return _dtemps[name] 

133 except KeyError: 

134 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-") 

135 return d 

136 

137 

138def jupyter_config_dir() -> str: 

139 """Get the Jupyter config directory for this platform and user. 

140 

141 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate 

142 directory for the platform. 

143 """ 

144 

145 env = os.environ 

146 if env.get("JUPYTER_NO_CONFIG"): 

147 return _mkdtemp_once("jupyter-clean-cfg") 

148 

149 if env.get("JUPYTER_CONFIG_DIR"): 

150 return env["JUPYTER_CONFIG_DIR"] 

151 

152 if use_platform_dirs(): 

153 return platformdirs.user_config_dir(APPNAME, appauthor=False) 

154 

155 home_dir = get_home_dir() 

156 return pjoin(home_dir, ".jupyter") 

157 

158 

159def jupyter_data_dir() -> str: 

160 """Get the config directory for Jupyter data files for this platform and user. 

161 

162 These are non-transient, non-configuration files. 

163 

164 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path. 

165 """ 

166 env = os.environ 

167 

168 if env.get("JUPYTER_DATA_DIR"): 

169 return env["JUPYTER_DATA_DIR"] 

170 

171 if use_platform_dirs(): 

172 return platformdirs.user_data_dir(APPNAME, appauthor=False) 

173 

174 home = get_home_dir() 

175 

176 if sys.platform == "darwin": 

177 return os.path.join(home, "Library", "Jupyter") 

178 elif os.name == "nt": 

179 appdata = os.environ.get("APPDATA", None) 

180 if appdata: 

181 return str(Path(appdata, "jupyter").resolve()) 

182 else: 

183 return pjoin(jupyter_config_dir(), "data") 

184 else: 

185 # Linux, non-OS X Unix, AIX, etc. 

186 xdg = env.get("XDG_DATA_HOME", None) 

187 if not xdg: 

188 xdg = pjoin(home, ".local", "share") 

189 return pjoin(xdg, "jupyter") 

190 

191 

192def jupyter_runtime_dir() -> str: 

193 """Return the runtime dir for transient jupyter files. 

194 

195 Returns JUPYTER_RUNTIME_DIR if defined. 

196 

197 The default is now (data_dir)/runtime on all platforms; 

198 we no longer use XDG_RUNTIME_DIR after various problems. 

199 """ 

200 env = os.environ 

201 

202 if env.get("JUPYTER_RUNTIME_DIR"): 

203 return env["JUPYTER_RUNTIME_DIR"] 

204 

205 return pjoin(jupyter_data_dir(), "runtime") 

206 

207 

208if use_platform_dirs(): 

209 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir( 

210 APPNAME, appauthor=False, multipath=True 

211 ).split(os.pathsep) 

212else: 

213 deprecation( 

214 "Jupyter is migrating its paths to use standard platformdirs\n" 

215 "given by the platformdirs library. To remove this warning and\n" 

216 "see the appropriate new directories, set the environment variable\n" 

217 "`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.\n" 

218 "The use of platformdirs will be the default in `jupyter_core` v6" 

219 ) 

220 if os.name == "nt": 

221 programdata = os.environ.get("PROGRAMDATA", None) 

222 if programdata: 

223 SYSTEM_JUPYTER_PATH = [pjoin(programdata, "jupyter")] 

224 else: # PROGRAMDATA is not defined by default on XP. 

225 SYSTEM_JUPYTER_PATH = [os.path.join(sys.prefix, "share", "jupyter")] 

226 else: 

227 SYSTEM_JUPYTER_PATH = [ 

228 "/usr/local/share/jupyter", 

229 "/usr/share/jupyter", 

230 ] 

231 

232ENV_JUPYTER_PATH: List[str] = [os.path.join(sys.prefix, "share", "jupyter")] 

233 

234 

235def jupyter_path(*subdirs: str) -> List[str]: 

236 """Return a list of directories to search for data files 

237 

238 JUPYTER_PATH environment variable has highest priority. 

239 

240 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level 

241 directories will have priority over user-level directories. 

242 

243 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

244 appropriate Python user site subdirectory to the user-level directories. 

245 

246 

247 If ``*subdirs`` are given, that subdirectory will be added to each element. 

248 

249 Examples: 

250 

251 >>> jupyter_path() 

252 ['~/.local/jupyter', '/usr/local/share/jupyter'] 

253 >>> jupyter_path('kernels') 

254 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels'] 

255 """ 

256 

257 paths: List[str] = [] 

258 

259 # highest priority is explicit environment variable 

260 if os.environ.get("JUPYTER_PATH"): 

261 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep)) 

262 

263 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

264 user = [jupyter_data_dir()] 

265 if site.ENABLE_USER_SITE: 

266 # Check if site.getuserbase() exists to be compatible with virtualenv, 

267 # which often does not have this method. 

268 userbase: Optional[str] 

269 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

270 

271 if userbase: 

272 userdir = os.path.join(userbase, "share", "jupyter") 

273 if userdir not in user: 

274 user.append(userdir) 

275 

276 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH] 

277 

278 if prefer_environment_over_user(): 

279 paths.extend(env) 

280 paths.extend(user) 

281 else: 

282 paths.extend(user) 

283 paths.extend(env) 

284 

285 # finally, system 

286 paths.extend(SYSTEM_JUPYTER_PATH) 

287 

288 # add subdir, if requested 

289 if subdirs: 

290 paths = [pjoin(p, *subdirs) for p in paths] 

291 return paths 

292 

293 

294if use_platform_dirs(): 

295 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir( 

296 APPNAME, appauthor=False, multipath=True 

297 ).split(os.pathsep) 

298else: # noqa: PLR5501 

299 if os.name == "nt": 

300 programdata = os.environ.get("PROGRAMDATA", None) 

301 if programdata: # noqa 

302 SYSTEM_CONFIG_PATH = [os.path.join(programdata, "jupyter")] 

303 else: # PROGRAMDATA is not defined by default on XP. 

304 SYSTEM_CONFIG_PATH = [] 

305 else: 

306 SYSTEM_CONFIG_PATH = [ 

307 "/usr/local/etc/jupyter", 

308 "/etc/jupyter", 

309 ] 

310ENV_CONFIG_PATH: List[str] = [os.path.join(sys.prefix, "etc", "jupyter")] 

311 

312 

313def jupyter_config_path() -> List[str]: 

314 """Return the search path for Jupyter config files as a list. 

315 

316 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the 

317 environment-level directories will have priority over user-level 

318 directories. 

319 

320 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

321 appropriate Python user site subdirectory to the user-level directories. 

322 """ 

323 if os.environ.get("JUPYTER_NO_CONFIG"): 

324 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set. 

325 return [jupyter_config_dir()] 

326 

327 paths: List[str] = [] 

328 

329 # highest priority is explicit environment variable 

330 if os.environ.get("JUPYTER_CONFIG_PATH"): 

331 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep)) 

332 

333 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

334 user = [jupyter_config_dir()] 

335 if site.ENABLE_USER_SITE: 

336 userbase: Optional[str] 

337 # Check if site.getuserbase() exists to be compatible with virtualenv, 

338 # which often does not have this method. 

339 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

340 

341 if userbase: 

342 userdir = os.path.join(userbase, "etc", "jupyter") 

343 if userdir not in user: 

344 user.append(userdir) 

345 

346 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH] 

347 

348 if prefer_environment_over_user(): 

349 paths.extend(env) 

350 paths.extend(user) 

351 else: 

352 paths.extend(user) 

353 paths.extend(env) 

354 

355 # Finally, system path 

356 paths.extend(SYSTEM_CONFIG_PATH) 

357 return paths 

358 

359 

360def exists(path: str) -> bool: 

361 """Replacement for `os.path.exists` which works for host mapped volumes 

362 on Windows containers 

363 """ 

364 try: 

365 os.lstat(path) 

366 except OSError: 

367 return False 

368 return True 

369 

370 

371def is_file_hidden_win(abs_path: str, stat_res: Optional[Any] = None) -> bool: 

372 """Is a file hidden? 

373 

374 This only checks the file itself; it should be called in combination with 

375 checking the directory containing the file. 

376 

377 Use is_hidden() instead to check the file and its parent directories. 

378 

379 Parameters 

380 ---------- 

381 abs_path : unicode 

382 The absolute path to check. 

383 stat_res : os.stat_result, optional 

384 The result of calling stat() on abs_path. If not passed, this function 

385 will call stat() internally. 

386 """ 

387 if os.path.basename(abs_path).startswith("."): 

388 return True 

389 

390 if stat_res is None: 

391 try: 

392 stat_res = os.stat(abs_path) 

393 except OSError as e: 

394 if e.errno == errno.ENOENT: 

395 return False 

396 raise 

397 

398 try: 

399 if ( 

400 stat_res.st_file_attributes # type:ignore[union-attr] 

401 & stat.FILE_ATTRIBUTE_HIDDEN # type:ignore[attr-defined] 

402 ): 

403 return True 

404 except AttributeError: 

405 # allow AttributeError on PyPy for Windows 

406 # 'stat_result' object has no attribute 'st_file_attributes' 

407 # https://foss.heptapod.net/pypy/pypy/-/issues/3469 

408 warnings.warn( 

409 "hidden files are not detectable on this system, so no file will be marked as hidden.", 

410 stacklevel=2, 

411 ) 

412 pass 

413 

414 return False 

415 

416 

417def is_file_hidden_posix(abs_path: str, stat_res: Optional[Any] = None) -> bool: 

418 """Is a file hidden? 

419 

420 This only checks the file itself; it should be called in combination with 

421 checking the directory containing the file. 

422 

423 Use is_hidden() instead to check the file and its parent directories. 

424 

425 Parameters 

426 ---------- 

427 abs_path : unicode 

428 The absolute path to check. 

429 stat_res : os.stat_result, optional 

430 The result of calling stat() on abs_path. If not passed, this function 

431 will call stat() internally. 

432 """ 

433 if os.path.basename(abs_path).startswith("."): 

434 return True 

435 

436 if stat_res is None or stat.S_ISLNK(stat_res.st_mode): 

437 try: 

438 stat_res = os.stat(abs_path) 

439 except OSError as e: 

440 if e.errno == errno.ENOENT: 

441 return False 

442 raise 

443 

444 # check that dirs can be listed 

445 if stat.S_ISDIR(stat_res.st_mode): # type:ignore[misc] # noqa 

446 # use x-access, not actual listing, in case of slow/large listings 

447 if not os.access(abs_path, os.X_OK | os.R_OK): 

448 return True 

449 

450 # check UF_HIDDEN 

451 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN: 

452 return True 

453 

454 return False 

455 

456 

457if sys.platform == "win32": 

458 is_file_hidden = is_file_hidden_win 

459else: 

460 is_file_hidden = is_file_hidden_posix 

461 

462 

463def is_hidden(abs_path: str, abs_root: str = "") -> bool: 

464 """Is a file hidden or contained in a hidden directory? 

465 

466 This will start with the rightmost path element and work backwards to the 

467 given root to see if a path is hidden or in a hidden directory. Hidden is 

468 determined by either name starting with '.' or the UF_HIDDEN flag as 

469 reported by stat. 

470 

471 If abs_path is the same directory as abs_root, it will be visible even if 

472 that is a hidden folder. This only checks the visibility of files 

473 and directories *within* abs_root. 

474 

475 Parameters 

476 ---------- 

477 abs_path : unicode 

478 The absolute path to check for hidden directories. 

479 abs_root : unicode 

480 The absolute path of the root directory in which hidden directories 

481 should be checked for. 

482 """ 

483 abs_path = os.path.normpath(abs_path) 

484 abs_root = os.path.normpath(abs_root) 

485 

486 if abs_path == abs_root: 

487 return False 

488 

489 if is_file_hidden(abs_path): 

490 return True 

491 

492 if not abs_root: 

493 abs_root = abs_path.split(os.sep, 1)[0] + os.sep 

494 inside_root = abs_path[len(abs_root) :] 

495 if any(part.startswith(".") for part in inside_root.split(os.sep)): 

496 return True 

497 

498 # check UF_HIDDEN on any location up to root. 

499 # is_file_hidden() already checked the file, so start from its parent dir 

500 path = os.path.dirname(abs_path) 

501 while path and path.startswith(abs_root) and path != abs_root: 

502 if not exists(path): 

503 path = os.path.dirname(path) 

504 continue 

505 try: 

506 # may fail on Windows junctions 

507 st = os.lstat(path) 

508 except OSError: 

509 return True 

510 if getattr(st, "st_flags", 0) & UF_HIDDEN: 

511 return True 

512 path = os.path.dirname(path) 

513 

514 return False 

515 

516 

517def win32_restrict_file_to_user(fname: str) -> None: 

518 """Secure a windows file to read-only access for the user. 

519 Follows guidance from win32 library creator: 

520 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

521 

522 This method should be executed against an already generated file which 

523 has no secrets written to it yet. 

524 

525 Parameters 

526 ---------- 

527 

528 fname : unicode 

529 The path to the file to secure 

530 """ 

531 try: 

532 import win32api 

533 except ImportError: 

534 return _win32_restrict_file_to_user_ctypes(fname) 

535 

536 import ntsecuritycon as con 

537 import win32security 

538 

539 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone") 

540 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid) 

541 user, _domain, _type = win32security.LookupAccountName( 

542 "", win32api.GetUserNameEx(win32api.NameSamCompatible) 

543 ) 

544 

545 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION) 

546 

547 dacl = win32security.ACL() 

548 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone) 

549 dacl.AddAccessAllowedAce( 

550 win32security.ACL_REVISION, 

551 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE, 

552 user, 

553 ) 

554 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins) 

555 

556 sd.SetSecurityDescriptorDacl(1, dacl, 0) 

557 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd) 

558 

559 

560def _win32_restrict_file_to_user_ctypes(fname: str) -> None: # noqa 

561 """Secure a windows file to read-only access for the user. 

562 

563 Follows guidance from win32 library creator: 

564 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

565 

566 This method should be executed against an already generated file which 

567 has no secrets written to it yet. 

568 

569 Parameters 

570 ---------- 

571 

572 fname : unicode 

573 The path to the file to secure 

574 """ 

575 import ctypes 

576 from ctypes import wintypes 

577 

578 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined] 

579 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined] 

580 

581 NameSamCompatible = 2 

582 WinBuiltinAdministratorsSid = 26 

583 DACL_SECURITY_INFORMATION = 4 

584 ACL_REVISION = 2 

585 ERROR_INSUFFICIENT_BUFFER = 122 

586 ERROR_MORE_DATA = 234 

587 

588 SYNCHRONIZE = 0x100000 

589 DELETE = 0x00010000 

590 STANDARD_RIGHTS_REQUIRED = 0xF0000 

591 STANDARD_RIGHTS_READ = 0x20000 

592 STANDARD_RIGHTS_WRITE = 0x20000 

593 FILE_READ_DATA = 1 

594 FILE_READ_EA = 8 

595 FILE_READ_ATTRIBUTES = 128 

596 FILE_WRITE_DATA = 2 

597 FILE_APPEND_DATA = 4 

598 FILE_WRITE_EA = 16 

599 FILE_WRITE_ATTRIBUTES = 256 

600 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF 

601 FILE_GENERIC_READ = ( 

602 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE 

603 ) 

604 FILE_GENERIC_WRITE = ( 

605 STANDARD_RIGHTS_WRITE 

606 | FILE_WRITE_DATA 

607 | FILE_WRITE_ATTRIBUTES 

608 | FILE_WRITE_EA 

609 | FILE_APPEND_DATA 

610 | SYNCHRONIZE 

611 ) 

612 

613 class ACL(ctypes.Structure): 

614 _fields_ = [ # noqa 

615 ("AclRevision", wintypes.BYTE), 

616 ("Sbz1", wintypes.BYTE), 

617 ("AclSize", wintypes.WORD), 

618 ("AceCount", wintypes.WORD), 

619 ("Sbz2", wintypes.WORD), 

620 ] 

621 

622 PSID = ctypes.c_void_p 

623 PACL = ctypes.POINTER(ACL) 

624 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE) 

625 

626 def _nonzero_success(result: int, func: Any, args: Any) -> Any: 

627 if not result: 

628 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined] 

629 return args 

630 

631 secur32.GetUserNameExW.errcheck = _nonzero_success 

632 secur32.GetUserNameExW.restype = wintypes.BOOL 

633 secur32.GetUserNameExW.argtypes = ( 

634 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat 

635 wintypes.LPWSTR, # LPWSTR lpNameBuffer, 

636 wintypes.PULONG, # PULONG nSize 

637 ) 

638 

639 advapi32.CreateWellKnownSid.errcheck = _nonzero_success 

640 advapi32.CreateWellKnownSid.restype = wintypes.BOOL 

641 advapi32.CreateWellKnownSid.argtypes = ( 

642 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType 

643 PSID, # PSID DomainSid 

644 PSID, # PSID pSid 

645 wintypes.PDWORD, # DWORD *cbSid 

646 ) 

647 

648 advapi32.LookupAccountNameW.errcheck = _nonzero_success 

649 advapi32.LookupAccountNameW.restype = wintypes.BOOL 

650 advapi32.LookupAccountNameW.argtypes = ( 

651 wintypes.LPWSTR, # LPCWSTR lpSystemName 

652 wintypes.LPWSTR, # LPCWSTR lpAccountName 

653 PSID, # PSID Sid 

654 wintypes.LPDWORD, # LPDWORD cbSid 

655 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName 

656 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName 

657 wintypes.LPDWORD, # PSID_NAME_USE peUse 

658 ) 

659 

660 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success 

661 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL 

662 advapi32.AddAccessAllowedAce.argtypes = ( 

663 PACL, # PACL pAcl 

664 wintypes.DWORD, # DWORD dwAceRevision 

665 wintypes.DWORD, # DWORD AccessMask 

666 PSID, # PSID pSid 

667 ) 

668 

669 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success 

670 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL 

671 advapi32.SetSecurityDescriptorDacl.argtypes = ( 

672 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

673 wintypes.BOOL, # BOOL bDaclPresent 

674 PACL, # PACL pDacl 

675 wintypes.BOOL, # BOOL bDaclDefaulted 

676 ) 

677 

678 advapi32.GetFileSecurityW.errcheck = _nonzero_success 

679 advapi32.GetFileSecurityW.restype = wintypes.BOOL 

680 advapi32.GetFileSecurityW.argtypes = ( 

681 wintypes.LPCWSTR, # LPCWSTR lpFileName 

682 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation 

683 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

684 wintypes.DWORD, # DWORD nLength 

685 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded 

686 ) 

687 

688 advapi32.SetFileSecurityW.errcheck = _nonzero_success 

689 advapi32.SetFileSecurityW.restype = wintypes.BOOL 

690 advapi32.SetFileSecurityW.argtypes = ( 

691 wintypes.LPCWSTR, # LPCWSTR lpFileName 

692 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation 

693 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

694 ) 

695 

696 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success 

697 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL 

698 advapi32.MakeAbsoluteSD.argtypes = ( 

699 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

700 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

701 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize 

702 PACL, # PACL pDacl 

703 wintypes.LPDWORD, # LPDWORD lpdwDaclSize 

704 PACL, # PACL pSacl 

705 wintypes.LPDWORD, # LPDWORD lpdwSaclSize 

706 PSID, # PSID pOwner 

707 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize 

708 PSID, # PSID pPrimaryGroup 

709 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize 

710 ) 

711 

712 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success 

713 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL 

714 advapi32.MakeSelfRelativeSD.argtypes = ( 

715 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

716 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

717 wintypes.LPDWORD, # LPDWORD lpdwBufferLength 

718 ) 

719 

720 advapi32.InitializeAcl.errcheck = _nonzero_success 

721 advapi32.InitializeAcl.restype = wintypes.BOOL 

722 advapi32.InitializeAcl.argtypes = ( 

723 PACL, # PACL pAcl, 

724 wintypes.DWORD, # DWORD nAclLength, 

725 wintypes.DWORD, # DWORD dwAclRevision 

726 ) 

727 

728 def CreateWellKnownSid(WellKnownSidType: Any) -> Any: 

729 # return a SID for predefined aliases 

730 pSid = (ctypes.c_char * 1)() 

731 cbSid = wintypes.DWORD() 

732 try: 

733 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

734 except OSError as e: 

735 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

736 raise 

737 pSid = (ctypes.c_char * cbSid.value)() 

738 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

739 return pSid[:] 

740 

741 def GetUserNameEx(NameFormat: Any) -> Any: 

742 # return the user or other security principal associated with 

743 # the calling thread 

744 nSize = ctypes.pointer(ctypes.c_ulong(0)) 

745 try: 

746 secur32.GetUserNameExW(NameFormat, None, nSize) 

747 except OSError as e: 

748 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined] 

749 raise 

750 if not nSize.contents.value: 

751 return None 

752 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value) 

753 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize) 

754 return lpNameBuffer.value 

755 

756 def LookupAccountName(lpSystemName: Any, lpAccountName: Any) -> Any: 

757 # return a security identifier (SID) for an account on a system 

758 # and the name of the domain on which the account was found 

759 cbSid = wintypes.DWORD(0) 

760 cchReferencedDomainName = wintypes.DWORD(0) 

761 peUse = wintypes.DWORD(0) 

762 try: 

763 advapi32.LookupAccountNameW( 

764 lpSystemName, 

765 lpAccountName, 

766 None, 

767 ctypes.byref(cbSid), 

768 None, 

769 ctypes.byref(cchReferencedDomainName), 

770 ctypes.byref(peUse), 

771 ) 

772 except OSError as e: 

773 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

774 raise 

775 Sid = ctypes.create_unicode_buffer("", cbSid.value) 

776 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID) 

777 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1) 

778 success = advapi32.LookupAccountNameW( 

779 lpSystemName, 

780 lpAccountName, 

781 pSid, 

782 ctypes.byref(cbSid), 

783 lpReferencedDomainName, 

784 ctypes.byref(cchReferencedDomainName), 

785 ctypes.byref(peUse), 

786 ) 

787 if not success: 

788 raise ctypes.WinError() # type:ignore[attr-defined] 

789 return pSid, lpReferencedDomainName.value, peUse.value 

790 

791 def AddAccessAllowedAce(pAcl: Any, dwAceRevision: Any, AccessMask: Any, pSid: Any) -> Any: 

792 # add an access-allowed access control entry (ACE) 

793 # to an access control list (ACL) 

794 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid) 

795 

796 def GetFileSecurity(lpFileName: Any, RequestedInformation: Any) -> Any: 

797 # return information about the security of a file or directory 

798 nLength = wintypes.DWORD(0) 

799 try: 

800 advapi32.GetFileSecurityW( 

801 lpFileName, 

802 RequestedInformation, 

803 None, 

804 0, 

805 ctypes.byref(nLength), 

806 ) 

807 except OSError as e: 

808 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

809 raise 

810 if not nLength.value: 

811 return None 

812 pSecurityDescriptor = (wintypes.BYTE * nLength.value)() 

813 advapi32.GetFileSecurityW( 

814 lpFileName, 

815 RequestedInformation, 

816 pSecurityDescriptor, 

817 nLength, 

818 ctypes.byref(nLength), 

819 ) 

820 return pSecurityDescriptor 

821 

822 def SetFileSecurity( 

823 lpFileName: Any, RequestedInformation: Any, pSecurityDescriptor: Any 

824 ) -> Any: 

825 # set the security of a file or directory object 

826 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor) 

827 

828 def SetSecurityDescriptorDacl( 

829 pSecurityDescriptor: Any, bDaclPresent: Any, pDacl: Any, bDaclDefaulted: Any 

830 ) -> Any: 

831 # set information in a discretionary access control list (DACL) 

832 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted) 

833 

834 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor: Any) -> Any: 

835 # return a security descriptor in absolute format 

836 # by using a security descriptor in self-relative format as a template 

837 pAbsoluteSecurityDescriptor = None 

838 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0) 

839 pDacl = None 

840 lpdwDaclSize = wintypes.DWORD(0) 

841 pSacl = None 

842 lpdwSaclSize = wintypes.DWORD(0) 

843 pOwner = None 

844 lpdwOwnerSize = wintypes.DWORD(0) 

845 pPrimaryGroup = None 

846 lpdwPrimaryGroupSize = wintypes.DWORD(0) 

847 try: 

848 advapi32.MakeAbsoluteSD( 

849 pSelfRelativeSecurityDescriptor, 

850 pAbsoluteSecurityDescriptor, 

851 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

852 pDacl, 

853 ctypes.byref(lpdwDaclSize), 

854 pSacl, 

855 ctypes.byref(lpdwSaclSize), 

856 pOwner, 

857 ctypes.byref(lpdwOwnerSize), 

858 pPrimaryGroup, 

859 ctypes.byref(lpdwPrimaryGroupSize), 

860 ) 

861 except OSError as e: 

862 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

863 raise 

864 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)() 

865 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)() 

866 pDacl = ctypes.cast(pDaclData, PACL).contents 

867 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)() 

868 pSacl = ctypes.cast(pSaclData, PACL).contents 

869 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)() 

870 pOwner = ctypes.cast(pOwnerData, PSID) 

871 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)() 

872 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID) 

873 advapi32.MakeAbsoluteSD( 

874 pSelfRelativeSecurityDescriptor, 

875 pAbsoluteSecurityDescriptor, 

876 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

877 pDacl, 

878 ctypes.byref(lpdwDaclSize), 

879 pSacl, 

880 ctypes.byref(lpdwSaclSize), 

881 pOwner, 

882 lpdwOwnerSize, 

883 pPrimaryGroup, 

884 ctypes.byref(lpdwPrimaryGroupSize), 

885 ) 

886 return pAbsoluteSecurityDescriptor 

887 

888 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor: Any) -> Any: 

889 # return a security descriptor in self-relative format 

890 # by using a security descriptor in absolute format as a template 

891 pSelfRelativeSecurityDescriptor = None 

892 lpdwBufferLength = wintypes.DWORD(0) 

893 try: 

894 advapi32.MakeSelfRelativeSD( 

895 pAbsoluteSecurityDescriptor, 

896 pSelfRelativeSecurityDescriptor, 

897 ctypes.byref(lpdwBufferLength), 

898 ) 

899 except OSError as e: 

900 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

901 raise 

902 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)() 

903 advapi32.MakeSelfRelativeSD( 

904 pAbsoluteSecurityDescriptor, 

905 pSelfRelativeSecurityDescriptor, 

906 ctypes.byref(lpdwBufferLength), 

907 ) 

908 return pSelfRelativeSecurityDescriptor 

909 

910 def NewAcl() -> Any: 

911 # return a new, initialized ACL (access control list) structure 

912 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ? 

913 acl_data = ctypes.create_string_buffer(nAclLength) 

914 pAcl = ctypes.cast(acl_data, PACL).contents 

915 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION) 

916 return pAcl 

917 

918 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid) 

919 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0] 

920 

921 Acl = NewAcl() 

922 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins) 

923 AddAccessAllowedAce( 

924 Acl, 

925 ACL_REVISION, 

926 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE, 

927 SidUser, 

928 ) 

929 

930 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION) 

931 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD) 

932 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0) 

933 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD) 

934 

935 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD) 

936 

937 

938def get_file_mode(fname: str) -> int: 

939 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner. 

940 

941 Parameters 

942 ---------- 

943 

944 fname : unicode 

945 The path to the file to get mode from 

946 

947 """ 

948 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we 

949 # should tolerate the execute bit on the file's owner when validating permissions - thus 

950 # the missing least significant bit on the third octal digit. In addition, we also tolerate 

951 # the sticky bit being set, so the lsb from the fourth octal digit is also removed. 

952 return ( 

953 stat.S_IMODE(os.stat(fname).st_mode) & 0o6677 

954 ) # Use 4 octal digits since S_IMODE does the same 

955 

956 

957allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1") 

958 

959 

960@contextmanager 

961def secure_write(fname: str, binary: bool = False) -> Iterator[Any]: 

962 """Opens a file in the most restricted pattern available for 

963 writing content. This limits the file mode to `0o0600` and yields 

964 the resulting opened filed handle. 

965 

966 Parameters 

967 ---------- 

968 

969 fname : unicode 

970 The path to the file to write 

971 

972 binary: boolean 

973 Indicates that the file is binary 

974 """ 

975 mode = "wb" if binary else "w" 

976 encoding = None if binary else "utf-8" 

977 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC 

978 try: 

979 os.remove(fname) 

980 except OSError: 

981 # Skip any issues with the file not existing 

982 pass 

983 

984 if os.name == "nt": 

985 if allow_insecure_writes: 

986 # Mounted file systems can have a number of failure modes inside this block. 

987 # For windows machines in insecure mode we simply skip this to avoid failures :/ 

988 issue_insecure_write_warning() 

989 else: 

990 # Python on windows does not respect the group and public bits for chmod, so we need 

991 # to take additional steps to secure the contents. 

992 # Touch file pre-emptively to avoid editing permissions in open files in Windows 

993 fd = os.open(fname, open_flag, 0o0600) 

994 os.close(fd) 

995 open_flag = os.O_WRONLY | os.O_TRUNC 

996 win32_restrict_file_to_user(fname) 

997 

998 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f: 

999 if os.name != "nt": 

1000 # Enforce that the file got the requested permissions before writing 

1001 file_mode = get_file_mode(fname) 

1002 if file_mode != 0o0600: # noqa 

1003 if allow_insecure_writes: 

1004 issue_insecure_write_warning() 

1005 else: 

1006 msg = ( 

1007 f"Permissions assignment failed for secure file: '{fname}'." 

1008 f" Got '{oct(file_mode)}' instead of '0o0600'." 

1009 ) 

1010 raise RuntimeError(msg) 

1011 yield f 

1012 

1013 

1014def issue_insecure_write_warning() -> None: 

1015 """Issue an insecure write warning.""" 

1016 

1017 def format_warning(msg: str, *args: Any, **kwargs: Any) -> str: 

1018 return str(msg) + "\n" 

1019 

1020 warnings.formatwarning = format_warning # type:ignore[assignment] 

1021 warnings.warn( 

1022 "WARNING: Insecure writes have been enabled via environment variable " 

1023 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the " 

1024 "variable or set its value to 'False'.", 

1025 stacklevel=2, 

1026 )