Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_core/paths.py: 18%

440 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 06:10 +0000

1"""Path utility functions.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6# Derived from IPython.utils.path, which is 

7# Copyright (c) IPython Development Team. 

8# Distributed under the terms of the Modified BSD License. 

9 

10 

11import errno 

12import os 

13import site 

14import stat 

15import sys 

16import tempfile 

17import warnings 

18from contextlib import contextmanager 

19from pathlib import Path 

20from typing import Any, Dict, Iterator, List, Optional 

21 

22import platformdirs 

23 

24from .utils import deprecation 

25 

26pjoin = os.path.join 

27 

28# Capitalize Jupyter in paths only on Windows and MacOS 

29APPNAME = "Jupyter" if sys.platform in ("win32", "darwin") else "jupyter" 

30 

31# UF_HIDDEN is a stat flag not defined in the stat module. 

32# It is used by BSD to indicate hidden files. 

33UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768) 

34 

35 

36def envset(name: str, default: Optional[bool] = False) -> Optional[bool]: 

37 """Return the boolean value of a given environment variable. 

38 

39 An environment variable is considered set if it is assigned to a value 

40 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive) 

41 

42 If the environment variable is not defined, the default value is returned. 

43 """ 

44 if name not in os.environ: 

45 return default 

46 

47 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"] 

48 

49 

50def use_platform_dirs() -> bool: 

51 """Determine if platformdirs should be used for system-specific paths. 

52 

53 We plan for this to default to False in jupyter_core version 5 and to True 

54 in jupyter_core version 6. 

55 """ 

56 return envset("JUPYTER_PLATFORM_DIRS", False) # type:ignore[return-value] 

57 

58 

59def get_home_dir() -> str: 

60 """Get the real path of the home directory""" 

61 homedir = os.path.expanduser("~") 

62 # Next line will make things work even when /home/ is a symlink to 

63 # /usr/home as it is on FreeBSD, for example 

64 homedir = str(Path(homedir).resolve()) 

65 return homedir 

66 

67 

68_dtemps: Dict[str, str] = {} 

69 

70 

71def _do_i_own(path: str) -> bool: 

72 """Return whether the current user owns the given path""" 

73 p = Path(path).resolve() 

74 

75 # walk up to first existing parent 

76 while not p.exists() and p != p.parent: 

77 p = p.parent 

78 

79 # simplest check: owner by name 

80 # not always implemented or available 

81 try: 

82 return p.owner() == os.getlogin() 

83 except Exception: # noqa 

84 pass 

85 

86 if hasattr(os, 'geteuid'): 

87 try: 

88 st = p.stat() 

89 return st.st_uid == os.geteuid() 

90 except (NotImplementedError, OSError): 

91 # geteuid not always implemented 

92 pass 

93 

94 # no ownership checks worked, check write access 

95 return os.access(p, os.W_OK) 

96 

97 

98def prefer_environment_over_user() -> bool: 

99 """Determine if environment-level paths should take precedence over user-level paths.""" 

100 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value 

101 if "JUPYTER_PREFER_ENV_PATH" in os.environ: 

102 return envset("JUPYTER_PREFER_ENV_PATH") # type:ignore[return-value] 

103 

104 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def) 

105 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix): 

106 return True 

107 

108 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True 

109 if ( 

110 "CONDA_PREFIX" in os.environ 

111 and sys.prefix.startswith(os.environ["CONDA_PREFIX"]) 

112 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base" 

113 and _do_i_own(sys.prefix) 

114 ): 

115 return True 

116 

117 return False 

118 

119 

120def _mkdtemp_once(name: str) -> str: 

121 """Make or reuse a temporary directory. 

122 

123 If this is called with the same name in the same process, it will return 

124 the same directory. 

125 """ 

126 try: 

127 return _dtemps[name] 

128 except KeyError: 

129 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-") 

130 return d 

131 

132 

133def jupyter_config_dir() -> str: 

134 """Get the Jupyter config directory for this platform and user. 

135 

136 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate 

137 directory for the platform. 

138 """ 

139 

140 env = os.environ 

141 if env.get("JUPYTER_NO_CONFIG"): 

142 return _mkdtemp_once("jupyter-clean-cfg") 

143 

144 if env.get("JUPYTER_CONFIG_DIR"): 

145 return env["JUPYTER_CONFIG_DIR"] 

146 

147 if use_platform_dirs(): 

148 return platformdirs.user_config_dir(APPNAME, appauthor=False) 

149 

150 home_dir = get_home_dir() 

151 return pjoin(home_dir, ".jupyter") 

152 

153 

154def jupyter_data_dir() -> str: 

155 """Get the config directory for Jupyter data files for this platform and user. 

156 

157 These are non-transient, non-configuration files. 

158 

159 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path. 

160 """ 

161 env = os.environ 

162 

163 if env.get("JUPYTER_DATA_DIR"): 

164 return env["JUPYTER_DATA_DIR"] 

165 

166 if use_platform_dirs(): 

167 return platformdirs.user_data_dir(APPNAME, appauthor=False) 

168 

169 home = get_home_dir() 

170 

171 if sys.platform == "darwin": 

172 return os.path.join(home, "Library", "Jupyter") 

173 elif os.name == "nt": 

174 appdata = os.environ.get("APPDATA", None) 

175 if appdata: 

176 return str(Path(appdata, "jupyter").resolve()) 

177 else: 

178 return pjoin(jupyter_config_dir(), "data") 

179 else: 

180 # Linux, non-OS X Unix, AIX, etc. 

181 xdg = env.get("XDG_DATA_HOME", None) 

182 if not xdg: 

183 xdg = pjoin(home, ".local", "share") 

184 return pjoin(xdg, "jupyter") 

185 

186 

187def jupyter_runtime_dir() -> str: 

188 """Return the runtime dir for transient jupyter files. 

189 

190 Returns JUPYTER_RUNTIME_DIR if defined. 

191 

192 The default is now (data_dir)/runtime on all platforms; 

193 we no longer use XDG_RUNTIME_DIR after various problems. 

194 """ 

195 env = os.environ 

196 

197 if env.get("JUPYTER_RUNTIME_DIR"): 

198 return env["JUPYTER_RUNTIME_DIR"] 

199 

200 return pjoin(jupyter_data_dir(), "runtime") 

201 

202 

203if use_platform_dirs(): 

204 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir( 

205 APPNAME, appauthor=False, multipath=True 

206 ).split(os.pathsep) 

207else: 

208 deprecation( 

209 "Jupyter is migrating its paths to use standard platformdirs\n" 

210 "given by the platformdirs library. To remove this warning and\n" 

211 "see the appropriate new directories, set the environment variable\n" 

212 "`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.\n" 

213 "The use of platformdirs will be the default in `jupyter_core` v6" 

214 ) 

215 if os.name == "nt": 

216 programdata = os.environ.get("PROGRAMDATA", None) 

217 if programdata: 

218 SYSTEM_JUPYTER_PATH = [pjoin(programdata, "jupyter")] 

219 else: # PROGRAMDATA is not defined by default on XP. 

220 SYSTEM_JUPYTER_PATH = [os.path.join(sys.prefix, "share", "jupyter")] 

221 else: 

222 SYSTEM_JUPYTER_PATH = [ 

223 "/usr/local/share/jupyter", 

224 "/usr/share/jupyter", 

225 ] 

226 

227ENV_JUPYTER_PATH: List[str] = [os.path.join(sys.prefix, "share", "jupyter")] 

228 

229 

230def jupyter_path(*subdirs: str) -> List[str]: 

231 """Return a list of directories to search for data files 

232 

233 JUPYTER_PATH environment variable has highest priority. 

234 

235 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level 

236 directories will have priority over user-level directories. 

237 

238 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

239 appropriate Python user site subdirectory to the user-level directories. 

240 

241 

242 If ``*subdirs`` are given, that subdirectory will be added to each element. 

243 

244 Examples: 

245 

246 >>> jupyter_path() 

247 ['~/.local/jupyter', '/usr/local/share/jupyter'] 

248 >>> jupyter_path('kernels') 

249 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels'] 

250 """ 

251 

252 paths: List[str] = [] 

253 

254 # highest priority is explicit environment variable 

255 if os.environ.get("JUPYTER_PATH"): 

256 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep)) 

257 

258 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

259 user = [jupyter_data_dir()] 

260 if site.ENABLE_USER_SITE: 

261 # Check if site.getuserbase() exists to be compatible with virtualenv, 

262 # which often does not have this method. 

263 userbase: Optional[str] 

264 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

265 

266 if userbase: 

267 userdir = os.path.join(userbase, "share", "jupyter") 

268 if userdir not in user: 

269 user.append(userdir) 

270 

271 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH] 

272 

273 if prefer_environment_over_user(): 

274 paths.extend(env) 

275 paths.extend(user) 

276 else: 

277 paths.extend(user) 

278 paths.extend(env) 

279 

280 # finally, system 

281 paths.extend(SYSTEM_JUPYTER_PATH) 

282 

283 # add subdir, if requested 

284 if subdirs: 

285 paths = [pjoin(p, *subdirs) for p in paths] 

286 return paths 

287 

288 

289if use_platform_dirs(): 

290 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir( 

291 APPNAME, appauthor=False, multipath=True 

292 ).split(os.pathsep) 

293else: 

294 if os.name == "nt": # noqa 

295 programdata = os.environ.get("PROGRAMDATA", None) 

296 if programdata: # noqa 

297 SYSTEM_CONFIG_PATH = [os.path.join(programdata, "jupyter")] 

298 else: # PROGRAMDATA is not defined by default on XP. 

299 SYSTEM_CONFIG_PATH = [] 

300 else: 

301 SYSTEM_CONFIG_PATH = [ 

302 "/usr/local/etc/jupyter", 

303 "/etc/jupyter", 

304 ] 

305ENV_CONFIG_PATH: List[str] = [os.path.join(sys.prefix, "etc", "jupyter")] 

306 

307 

308def jupyter_config_path() -> List[str]: 

309 """Return the search path for Jupyter config files as a list. 

310 

311 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the 

312 environment-level directories will have priority over user-level 

313 directories. 

314 

315 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

316 appropriate Python user site subdirectory to the user-level directories. 

317 """ 

318 if os.environ.get("JUPYTER_NO_CONFIG"): 

319 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set. 

320 return [jupyter_config_dir()] 

321 

322 paths: List[str] = [] 

323 

324 # highest priority is explicit environment variable 

325 if os.environ.get("JUPYTER_CONFIG_PATH"): 

326 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep)) 

327 

328 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

329 user = [jupyter_config_dir()] 

330 if site.ENABLE_USER_SITE: 

331 userbase: Optional[str] 

332 # Check if site.getuserbase() exists to be compatible with virtualenv, 

333 # which often does not have this method. 

334 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

335 

336 if userbase: 

337 userdir = os.path.join(userbase, "etc", "jupyter") 

338 if userdir not in user: 

339 user.append(userdir) 

340 

341 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH] 

342 

343 if prefer_environment_over_user(): 

344 paths.extend(env) 

345 paths.extend(user) 

346 else: 

347 paths.extend(user) 

348 paths.extend(env) 

349 

350 # Finally, system path 

351 paths.extend(SYSTEM_CONFIG_PATH) 

352 return paths 

353 

354 

355def exists(path: str) -> bool: 

356 """Replacement for `os.path.exists` which works for host mapped volumes 

357 on Windows containers 

358 """ 

359 try: 

360 os.lstat(path) 

361 except OSError: 

362 return False 

363 return True 

364 

365 

366def is_file_hidden_win(abs_path: str, stat_res: Optional[Any] = None) -> bool: 

367 """Is a file hidden? 

368 

369 This only checks the file itself; it should be called in combination with 

370 checking the directory containing the file. 

371 

372 Use is_hidden() instead to check the file and its parent directories. 

373 

374 Parameters 

375 ---------- 

376 abs_path : unicode 

377 The absolute path to check. 

378 stat_res : os.stat_result, optional 

379 The result of calling stat() on abs_path. If not passed, this function 

380 will call stat() internally. 

381 """ 

382 if os.path.basename(abs_path).startswith("."): 

383 return True 

384 

385 if stat_res is None: 

386 try: 

387 stat_res = os.stat(abs_path) 

388 except OSError as e: 

389 if e.errno == errno.ENOENT: 

390 return False 

391 raise 

392 

393 try: 

394 if stat_res.st_file_attributes & stat.FILE_ATTRIBUTE_HIDDEN: # type:ignore 

395 return True 

396 except AttributeError: 

397 # allow AttributeError on PyPy for Windows 

398 # 'stat_result' object has no attribute 'st_file_attributes' 

399 # https://foss.heptapod.net/pypy/pypy/-/issues/3469 

400 warnings.warn( 

401 "hidden files are not detectable on this system, so no file will be marked as hidden." 

402 ) 

403 pass 

404 

405 return False 

406 

407 

408def is_file_hidden_posix(abs_path: str, stat_res: Optional[Any] = None) -> bool: 

409 """Is a file hidden? 

410 

411 This only checks the file itself; it should be called in combination with 

412 checking the directory containing the file. 

413 

414 Use is_hidden() instead to check the file and its parent directories. 

415 

416 Parameters 

417 ---------- 

418 abs_path : unicode 

419 The absolute path to check. 

420 stat_res : os.stat_result, optional 

421 The result of calling stat() on abs_path. If not passed, this function 

422 will call stat() internally. 

423 """ 

424 if os.path.basename(abs_path).startswith("."): 

425 return True 

426 

427 if stat_res is None or stat.S_ISLNK(stat_res.st_mode): 

428 try: 

429 stat_res = os.stat(abs_path) 

430 except OSError as e: 

431 if e.errno == errno.ENOENT: 

432 return False 

433 raise 

434 

435 # check that dirs can be listed 

436 if stat.S_ISDIR(stat_res.st_mode): # type:ignore[misc] # noqa 

437 # use x-access, not actual listing, in case of slow/large listings 

438 if not os.access(abs_path, os.X_OK | os.R_OK): 

439 return True 

440 

441 # check UF_HIDDEN 

442 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN: 

443 return True 

444 

445 return False 

446 

447 

448if sys.platform == "win32": 

449 is_file_hidden = is_file_hidden_win 

450else: 

451 is_file_hidden = is_file_hidden_posix 

452 

453 

454def is_hidden(abs_path: str, abs_root: str = "") -> bool: 

455 """Is a file hidden or contained in a hidden directory? 

456 

457 This will start with the rightmost path element and work backwards to the 

458 given root to see if a path is hidden or in a hidden directory. Hidden is 

459 determined by either name starting with '.' or the UF_HIDDEN flag as 

460 reported by stat. 

461 

462 If abs_path is the same directory as abs_root, it will be visible even if 

463 that is a hidden folder. This only checks the visibility of files 

464 and directories *within* abs_root. 

465 

466 Parameters 

467 ---------- 

468 abs_path : unicode 

469 The absolute path to check for hidden directories. 

470 abs_root : unicode 

471 The absolute path of the root directory in which hidden directories 

472 should be checked for. 

473 """ 

474 abs_path = os.path.normpath(abs_path) 

475 abs_root = os.path.normpath(abs_root) 

476 

477 if abs_path == abs_root: 

478 return False 

479 

480 if is_file_hidden(abs_path): 

481 return True 

482 

483 if not abs_root: 

484 abs_root = abs_path.split(os.sep, 1)[0] + os.sep 

485 inside_root = abs_path[len(abs_root) :] 

486 if any(part.startswith(".") for part in inside_root.split(os.sep)): 

487 return True 

488 

489 # check UF_HIDDEN on any location up to root. 

490 # is_file_hidden() already checked the file, so start from its parent dir 

491 path = os.path.dirname(abs_path) 

492 while path and path.startswith(abs_root) and path != abs_root: 

493 if not exists(path): 

494 path = os.path.dirname(path) 

495 continue 

496 try: 

497 # may fail on Windows junctions 

498 st = os.lstat(path) 

499 except OSError: 

500 return True 

501 if getattr(st, "st_flags", 0) & UF_HIDDEN: 

502 return True 

503 path = os.path.dirname(path) 

504 

505 return False 

506 

507 

508def win32_restrict_file_to_user(fname: str) -> None: 

509 """Secure a windows file to read-only access for the user. 

510 Follows guidance from win32 library creator: 

511 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

512 

513 This method should be executed against an already generated file which 

514 has no secrets written to it yet. 

515 

516 Parameters 

517 ---------- 

518 

519 fname : unicode 

520 The path to the file to secure 

521 """ 

522 try: 

523 import win32api 

524 except ImportError: 

525 return _win32_restrict_file_to_user_ctypes(fname) 

526 

527 import ntsecuritycon as con 

528 import win32security 

529 

530 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone") 

531 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid) 

532 user, _domain, _type = win32security.LookupAccountName( 

533 "", win32api.GetUserNameEx(win32api.NameSamCompatible) 

534 ) 

535 

536 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION) 

537 

538 dacl = win32security.ACL() 

539 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone) 

540 dacl.AddAccessAllowedAce( 

541 win32security.ACL_REVISION, 

542 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE, 

543 user, 

544 ) 

545 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins) 

546 

547 sd.SetSecurityDescriptorDacl(1, dacl, 0) 

548 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd) 

549 

550 

551def _win32_restrict_file_to_user_ctypes(fname: str) -> None: # noqa 

552 """Secure a windows file to read-only access for the user. 

553 

554 Follows guidance from win32 library creator: 

555 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

556 

557 This method should be executed against an already generated file which 

558 has no secrets written to it yet. 

559 

560 Parameters 

561 ---------- 

562 

563 fname : unicode 

564 The path to the file to secure 

565 """ 

566 import ctypes 

567 from ctypes import wintypes 

568 

569 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined] 

570 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined] 

571 

572 NameSamCompatible = 2 

573 WinBuiltinAdministratorsSid = 26 

574 DACL_SECURITY_INFORMATION = 4 

575 ACL_REVISION = 2 

576 ERROR_INSUFFICIENT_BUFFER = 122 

577 ERROR_MORE_DATA = 234 

578 

579 SYNCHRONIZE = 0x100000 

580 DELETE = 0x00010000 

581 STANDARD_RIGHTS_REQUIRED = 0xF0000 

582 STANDARD_RIGHTS_READ = 0x20000 

583 STANDARD_RIGHTS_WRITE = 0x20000 

584 FILE_READ_DATA = 1 

585 FILE_READ_EA = 8 

586 FILE_READ_ATTRIBUTES = 128 

587 FILE_WRITE_DATA = 2 

588 FILE_APPEND_DATA = 4 

589 FILE_WRITE_EA = 16 

590 FILE_WRITE_ATTRIBUTES = 256 

591 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF 

592 FILE_GENERIC_READ = ( 

593 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE 

594 ) 

595 FILE_GENERIC_WRITE = ( 

596 STANDARD_RIGHTS_WRITE 

597 | FILE_WRITE_DATA 

598 | FILE_WRITE_ATTRIBUTES 

599 | FILE_WRITE_EA 

600 | FILE_APPEND_DATA 

601 | SYNCHRONIZE 

602 ) 

603 

604 class ACL(ctypes.Structure): 

605 _fields_ = [ 

606 ("AclRevision", wintypes.BYTE), 

607 ("Sbz1", wintypes.BYTE), 

608 ("AclSize", wintypes.WORD), 

609 ("AceCount", wintypes.WORD), 

610 ("Sbz2", wintypes.WORD), 

611 ] 

612 

613 PSID = ctypes.c_void_p 

614 PACL = ctypes.POINTER(ACL) 

615 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE) 

616 

617 def _nonzero_success(result, func, args): 

618 if not result: 

619 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined] 

620 return args 

621 

622 secur32.GetUserNameExW.errcheck = _nonzero_success 

623 secur32.GetUserNameExW.restype = wintypes.BOOL 

624 secur32.GetUserNameExW.argtypes = ( 

625 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat 

626 wintypes.LPWSTR, # LPWSTR lpNameBuffer, 

627 wintypes.PULONG, # PULONG nSize 

628 ) 

629 

630 advapi32.CreateWellKnownSid.errcheck = _nonzero_success 

631 advapi32.CreateWellKnownSid.restype = wintypes.BOOL 

632 advapi32.CreateWellKnownSid.argtypes = ( 

633 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType 

634 PSID, # PSID DomainSid 

635 PSID, # PSID pSid 

636 wintypes.PDWORD, # DWORD *cbSid 

637 ) 

638 

639 advapi32.LookupAccountNameW.errcheck = _nonzero_success 

640 advapi32.LookupAccountNameW.restype = wintypes.BOOL 

641 advapi32.LookupAccountNameW.argtypes = ( 

642 wintypes.LPWSTR, # LPCWSTR lpSystemName 

643 wintypes.LPWSTR, # LPCWSTR lpAccountName 

644 PSID, # PSID Sid 

645 wintypes.LPDWORD, # LPDWORD cbSid 

646 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName 

647 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName 

648 wintypes.LPDWORD, # PSID_NAME_USE peUse 

649 ) 

650 

651 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success 

652 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL 

653 advapi32.AddAccessAllowedAce.argtypes = ( 

654 PACL, # PACL pAcl 

655 wintypes.DWORD, # DWORD dwAceRevision 

656 wintypes.DWORD, # DWORD AccessMask 

657 PSID, # PSID pSid 

658 ) 

659 

660 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success 

661 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL 

662 advapi32.SetSecurityDescriptorDacl.argtypes = ( 

663 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

664 wintypes.BOOL, # BOOL bDaclPresent 

665 PACL, # PACL pDacl 

666 wintypes.BOOL, # BOOL bDaclDefaulted 

667 ) 

668 

669 advapi32.GetFileSecurityW.errcheck = _nonzero_success 

670 advapi32.GetFileSecurityW.restype = wintypes.BOOL 

671 advapi32.GetFileSecurityW.argtypes = ( 

672 wintypes.LPCWSTR, # LPCWSTR lpFileName 

673 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation 

674 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

675 wintypes.DWORD, # DWORD nLength 

676 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded 

677 ) 

678 

679 advapi32.SetFileSecurityW.errcheck = _nonzero_success 

680 advapi32.SetFileSecurityW.restype = wintypes.BOOL 

681 advapi32.SetFileSecurityW.argtypes = ( 

682 wintypes.LPCWSTR, # LPCWSTR lpFileName 

683 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation 

684 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

685 ) 

686 

687 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success 

688 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL 

689 advapi32.MakeAbsoluteSD.argtypes = ( 

690 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

691 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

692 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize 

693 PACL, # PACL pDacl 

694 wintypes.LPDWORD, # LPDWORD lpdwDaclSize 

695 PACL, # PACL pSacl 

696 wintypes.LPDWORD, # LPDWORD lpdwSaclSize 

697 PSID, # PSID pOwner 

698 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize 

699 PSID, # PSID pPrimaryGroup 

700 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize 

701 ) 

702 

703 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success 

704 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL 

705 advapi32.MakeSelfRelativeSD.argtypes = ( 

706 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

707 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

708 wintypes.LPDWORD, # LPDWORD lpdwBufferLength 

709 ) 

710 

711 advapi32.InitializeAcl.errcheck = _nonzero_success 

712 advapi32.InitializeAcl.restype = wintypes.BOOL 

713 advapi32.InitializeAcl.argtypes = ( 

714 PACL, # PACL pAcl, 

715 wintypes.DWORD, # DWORD nAclLength, 

716 wintypes.DWORD, # DWORD dwAclRevision 

717 ) 

718 

719 def CreateWellKnownSid(WellKnownSidType): 

720 # return a SID for predefined aliases 

721 pSid = (ctypes.c_char * 1)() 

722 cbSid = wintypes.DWORD() 

723 try: 

724 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

725 except OSError as e: 

726 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

727 raise 

728 pSid = (ctypes.c_char * cbSid.value)() 

729 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

730 return pSid[:] 

731 

732 def GetUserNameEx(NameFormat): 

733 # return the user or other security principal associated with 

734 # the calling thread 

735 nSize = ctypes.pointer(ctypes.c_ulong(0)) 

736 try: 

737 secur32.GetUserNameExW(NameFormat, None, nSize) 

738 except OSError as e: 

739 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined] 

740 raise 

741 if not nSize.contents.value: 

742 return None 

743 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value) 

744 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize) 

745 return lpNameBuffer.value 

746 

747 def LookupAccountName(lpSystemName, lpAccountName): 

748 # return a security identifier (SID) for an account on a system 

749 # and the name of the domain on which the account was found 

750 cbSid = wintypes.DWORD(0) 

751 cchReferencedDomainName = wintypes.DWORD(0) 

752 peUse = wintypes.DWORD(0) 

753 try: 

754 advapi32.LookupAccountNameW( 

755 lpSystemName, 

756 lpAccountName, 

757 None, 

758 ctypes.byref(cbSid), 

759 None, 

760 ctypes.byref(cchReferencedDomainName), 

761 ctypes.byref(peUse), 

762 ) 

763 except OSError as e: 

764 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

765 raise 

766 Sid = ctypes.create_unicode_buffer("", cbSid.value) 

767 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID) 

768 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1) 

769 success = advapi32.LookupAccountNameW( 

770 lpSystemName, 

771 lpAccountName, 

772 pSid, 

773 ctypes.byref(cbSid), 

774 lpReferencedDomainName, 

775 ctypes.byref(cchReferencedDomainName), 

776 ctypes.byref(peUse), 

777 ) 

778 if not success: 

779 raise ctypes.WinError() # type:ignore[attr-defined] 

780 return pSid, lpReferencedDomainName.value, peUse.value 

781 

782 def AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid): 

783 # add an access-allowed access control entry (ACE) 

784 # to an access control list (ACL) 

785 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid) 

786 

787 def GetFileSecurity(lpFileName, RequestedInformation): 

788 # return information about the security of a file or directory 

789 nLength = wintypes.DWORD(0) 

790 try: 

791 advapi32.GetFileSecurityW( 

792 lpFileName, 

793 RequestedInformation, 

794 None, 

795 0, 

796 ctypes.byref(nLength), 

797 ) 

798 except OSError as e: 

799 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

800 raise 

801 if not nLength.value: 

802 return None 

803 pSecurityDescriptor = (wintypes.BYTE * nLength.value)() 

804 advapi32.GetFileSecurityW( 

805 lpFileName, 

806 RequestedInformation, 

807 pSecurityDescriptor, 

808 nLength, 

809 ctypes.byref(nLength), 

810 ) 

811 return pSecurityDescriptor 

812 

813 def SetFileSecurity(lpFileName, RequestedInformation, pSecurityDescriptor): 

814 # set the security of a file or directory object 

815 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor) 

816 

817 def SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted): 

818 # set information in a discretionary access control list (DACL) 

819 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted) 

820 

821 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor): 

822 # return a security descriptor in absolute format 

823 # by using a security descriptor in self-relative format as a template 

824 pAbsoluteSecurityDescriptor = None 

825 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0) 

826 pDacl = None 

827 lpdwDaclSize = wintypes.DWORD(0) 

828 pSacl = None 

829 lpdwSaclSize = wintypes.DWORD(0) 

830 pOwner = None 

831 lpdwOwnerSize = wintypes.DWORD(0) 

832 pPrimaryGroup = None 

833 lpdwPrimaryGroupSize = wintypes.DWORD(0) 

834 try: 

835 advapi32.MakeAbsoluteSD( 

836 pSelfRelativeSecurityDescriptor, 

837 pAbsoluteSecurityDescriptor, 

838 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

839 pDacl, 

840 ctypes.byref(lpdwDaclSize), 

841 pSacl, 

842 ctypes.byref(lpdwSaclSize), 

843 pOwner, 

844 ctypes.byref(lpdwOwnerSize), 

845 pPrimaryGroup, 

846 ctypes.byref(lpdwPrimaryGroupSize), 

847 ) 

848 except OSError as e: 

849 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

850 raise 

851 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)() 

852 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)() 

853 pDacl = ctypes.cast(pDaclData, PACL).contents 

854 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)() 

855 pSacl = ctypes.cast(pSaclData, PACL).contents 

856 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)() 

857 pOwner = ctypes.cast(pOwnerData, PSID) 

858 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)() 

859 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID) 

860 advapi32.MakeAbsoluteSD( 

861 pSelfRelativeSecurityDescriptor, 

862 pAbsoluteSecurityDescriptor, 

863 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

864 pDacl, 

865 ctypes.byref(lpdwDaclSize), 

866 pSacl, 

867 ctypes.byref(lpdwSaclSize), 

868 pOwner, 

869 lpdwOwnerSize, 

870 pPrimaryGroup, 

871 ctypes.byref(lpdwPrimaryGroupSize), 

872 ) 

873 return pAbsoluteSecurityDescriptor 

874 

875 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor): 

876 # return a security descriptor in self-relative format 

877 # by using a security descriptor in absolute format as a template 

878 pSelfRelativeSecurityDescriptor = None 

879 lpdwBufferLength = wintypes.DWORD(0) 

880 try: 

881 advapi32.MakeSelfRelativeSD( 

882 pAbsoluteSecurityDescriptor, 

883 pSelfRelativeSecurityDescriptor, 

884 ctypes.byref(lpdwBufferLength), 

885 ) 

886 except OSError as e: 

887 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

888 raise 

889 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)() 

890 advapi32.MakeSelfRelativeSD( 

891 pAbsoluteSecurityDescriptor, 

892 pSelfRelativeSecurityDescriptor, 

893 ctypes.byref(lpdwBufferLength), 

894 ) 

895 return pSelfRelativeSecurityDescriptor 

896 

897 def NewAcl(): 

898 # return a new, initialized ACL (access control list) structure 

899 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ? 

900 acl_data = ctypes.create_string_buffer(nAclLength) 

901 pAcl = ctypes.cast(acl_data, PACL).contents 

902 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION) 

903 return pAcl 

904 

905 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid) 

906 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0] 

907 

908 Acl = NewAcl() 

909 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins) 

910 AddAccessAllowedAce( 

911 Acl, 

912 ACL_REVISION, 

913 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE, 

914 SidUser, 

915 ) 

916 

917 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION) 

918 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD) 

919 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0) 

920 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD) 

921 

922 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD) 

923 

924 

925def get_file_mode(fname: str) -> int: 

926 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner. 

927 

928 Parameters 

929 ---------- 

930 

931 fname : unicode 

932 The path to the file to get mode from 

933 

934 """ 

935 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we 

936 # should tolerate the execute bit on the file's owner when validating permissions - thus 

937 # the missing least significant bit on the third octal digit. In addition, we also tolerate 

938 # the sticky bit being set, so the lsb from the fourth octal digit is also removed. 

939 return ( 

940 stat.S_IMODE(os.stat(fname).st_mode) & 0o6677 

941 ) # Use 4 octal digits since S_IMODE does the same 

942 

943 

944allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1") 

945 

946 

947@contextmanager 

948def secure_write(fname: str, binary: bool = False) -> Iterator[Any]: 

949 """Opens a file in the most restricted pattern available for 

950 writing content. This limits the file mode to `0o0600` and yields 

951 the resulting opened filed handle. 

952 

953 Parameters 

954 ---------- 

955 

956 fname : unicode 

957 The path to the file to write 

958 

959 binary: boolean 

960 Indicates that the file is binary 

961 """ 

962 mode = "wb" if binary else "w" 

963 encoding = None if binary else "utf-8" 

964 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC 

965 try: 

966 os.remove(fname) 

967 except OSError: 

968 # Skip any issues with the file not existing 

969 pass 

970 

971 if os.name == "nt": 

972 if allow_insecure_writes: 

973 # Mounted file systems can have a number of failure modes inside this block. 

974 # For windows machines in insecure mode we simply skip this to avoid failures :/ 

975 issue_insecure_write_warning() 

976 else: 

977 # Python on windows does not respect the group and public bits for chmod, so we need 

978 # to take additional steps to secure the contents. 

979 # Touch file pre-emptively to avoid editing permissions in open files in Windows 

980 fd = os.open(fname, open_flag, 0o0600) 

981 os.close(fd) 

982 open_flag = os.O_WRONLY | os.O_TRUNC 

983 win32_restrict_file_to_user(fname) 

984 

985 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f: 

986 if os.name != "nt": 

987 # Enforce that the file got the requested permissions before writing 

988 file_mode = get_file_mode(fname) 

989 if file_mode != 0o0600: # noqa 

990 if allow_insecure_writes: 

991 issue_insecure_write_warning() 

992 else: 

993 msg = ( 

994 "Permissions assignment failed for secure file: '{file}'." 

995 " Got '{permissions}' instead of '0o0600'.".format( 

996 file=fname, permissions=oct(file_mode) 

997 ) 

998 ) 

999 raise RuntimeError(msg) 

1000 yield f 

1001 

1002 

1003def issue_insecure_write_warning() -> None: 

1004 """Issue an insecure write warning.""" 

1005 

1006 def format_warning(msg, *args, **kwargs): 

1007 return str(msg) + "\n" 

1008 

1009 warnings.formatwarning = format_warning # type:ignore[assignment] 

1010 warnings.warn( 

1011 "WARNING: Insecure writes have been enabled via environment variable " 

1012 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the " 

1013 "variable or set its value to 'False'." 

1014 )