Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_core/paths.py: 18%

440 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1"""Path utility functions.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5 

6# Derived from IPython.utils.path, which is 

7# Copyright (c) IPython Development Team. 

8# Distributed under the terms of the Modified BSD License. 

9 

10 

11import errno 

12import os 

13import site 

14import stat 

15import sys 

16import tempfile 

17import warnings 

18from contextlib import contextmanager 

19from pathlib import Path 

20from typing import Any, Dict, Iterator, List, Optional 

21 

22import platformdirs 

23 

24from .utils import deprecation 

25 

26pjoin = os.path.join 

27 

28# Capitalize Jupyter in paths only on Windows and MacOS 

29APPNAME = "Jupyter" if sys.platform in ("win32", "darwin") else "jupyter" 

30 

31# UF_HIDDEN is a stat flag not defined in the stat module. 

32# It is used by BSD to indicate hidden files. 

33UF_HIDDEN = getattr(stat, "UF_HIDDEN", 32768) 

34 

35 

36def envset(name: str, default: Optional[bool] = False) -> Optional[bool]: 

37 """Return the boolean value of a given environment variable. 

38 

39 An environment variable is considered set if it is assigned to a value 

40 other than 'no', 'n', 'false', 'off', '0', or '0.0' (case insensitive) 

41 

42 If the environment variable is not defined, the default value is returned. 

43 """ 

44 if name not in os.environ: 

45 return default 

46 

47 return os.environ[name].lower() not in ["no", "n", "false", "off", "0", "0.0"] 

48 

49 

50def use_platform_dirs() -> bool: 

51 """Determine if platformdirs should be used for system-specific paths. 

52 

53 We plan for this to default to False in jupyter_core version 5 and to True 

54 in jupyter_core version 6. 

55 """ 

56 return envset("JUPYTER_PLATFORM_DIRS", False) # type:ignore[return-value] 

57 

58 

59def get_home_dir() -> str: 

60 """Get the real path of the home directory""" 

61 homedir = os.path.expanduser("~") 

62 # Next line will make things work even when /home/ is a symlink to 

63 # /usr/home as it is on FreeBSD, for example 

64 homedir = str(Path(homedir).resolve()) 

65 return homedir 

66 

67 

68_dtemps: Dict[str, str] = {} 

69 

70 

71def _do_i_own(path: str) -> bool: 

72 """Return whether the current user owns the given path""" 

73 p = Path(path).resolve() 

74 

75 # walk up to first existing parent 

76 while not p.exists() and p != p.parent: 

77 p = p.parent 

78 

79 # simplest check: owner by name 

80 # not always implemented or available 

81 try: 

82 return p.owner() == os.getlogin() 

83 except Exception: # noqa 

84 pass 

85 

86 if hasattr(os, 'geteuid'): 

87 try: 

88 st = p.stat() 

89 return st.st_uid == os.geteuid() 

90 except (NotImplementedError, OSError): 

91 # geteuid not always implemented 

92 pass 

93 

94 # no ownership checks worked, check write access 

95 return os.access(p, os.W_OK) 

96 

97 

98def prefer_environment_over_user() -> bool: 

99 """Determine if environment-level paths should take precedence over user-level paths.""" 

100 # If JUPYTER_PREFER_ENV_PATH is defined, that signals user intent, so return its value 

101 if "JUPYTER_PREFER_ENV_PATH" in os.environ: 

102 return envset("JUPYTER_PREFER_ENV_PATH") # type:ignore[return-value] 

103 

104 # If we are in a Python virtualenv, default to True (see https://docs.python.org/3/library/venv.html#venv-def) 

105 if sys.prefix != sys.base_prefix and _do_i_own(sys.prefix): 

106 return True 

107 

108 # If sys.prefix indicates Python comes from a conda/mamba environment that is not the root environment, default to True 

109 if ( 

110 "CONDA_PREFIX" in os.environ 

111 and sys.prefix.startswith(os.environ["CONDA_PREFIX"]) 

112 and os.environ.get("CONDA_DEFAULT_ENV", "base") != "base" 

113 and _do_i_own(sys.prefix) 

114 ): 

115 return True 

116 

117 return False 

118 

119 

120def _mkdtemp_once(name: str) -> str: 

121 """Make or reuse a temporary directory. 

122 

123 If this is called with the same name in the same process, it will return 

124 the same directory. 

125 """ 

126 try: 

127 return _dtemps[name] 

128 except KeyError: 

129 d = _dtemps[name] = tempfile.mkdtemp(prefix=name + "-") 

130 return d 

131 

132 

133def jupyter_config_dir() -> str: 

134 """Get the Jupyter config directory for this platform and user. 

135 

136 Returns JUPYTER_CONFIG_DIR if defined, otherwise the appropriate 

137 directory for the platform. 

138 """ 

139 

140 env = os.environ 

141 if env.get("JUPYTER_NO_CONFIG"): 

142 return _mkdtemp_once("jupyter-clean-cfg") 

143 

144 if env.get("JUPYTER_CONFIG_DIR"): 

145 return env["JUPYTER_CONFIG_DIR"] 

146 

147 if use_platform_dirs(): 

148 return platformdirs.user_config_dir(APPNAME, appauthor=False) 

149 

150 home_dir = get_home_dir() 

151 return pjoin(home_dir, ".jupyter") 

152 

153 

154def jupyter_data_dir() -> str: 

155 """Get the config directory for Jupyter data files for this platform and user. 

156 

157 These are non-transient, non-configuration files. 

158 

159 Returns JUPYTER_DATA_DIR if defined, else a platform-appropriate path. 

160 """ 

161 env = os.environ 

162 

163 if env.get("JUPYTER_DATA_DIR"): 

164 return env["JUPYTER_DATA_DIR"] 

165 

166 if use_platform_dirs(): 

167 return platformdirs.user_data_dir(APPNAME, appauthor=False) 

168 

169 home = get_home_dir() 

170 

171 if sys.platform == "darwin": 

172 return os.path.join(home, "Library", "Jupyter") 

173 elif os.name == "nt": 

174 appdata = os.environ.get("APPDATA", None) 

175 if appdata: 

176 return str(Path(appdata, "jupyter").resolve()) 

177 else: 

178 return pjoin(jupyter_config_dir(), "data") 

179 else: 

180 # Linux, non-OS X Unix, AIX, etc. 

181 xdg = env.get("XDG_DATA_HOME", None) 

182 if not xdg: 

183 xdg = pjoin(home, ".local", "share") 

184 return pjoin(xdg, "jupyter") 

185 

186 

187def jupyter_runtime_dir() -> str: 

188 """Return the runtime dir for transient jupyter files. 

189 

190 Returns JUPYTER_RUNTIME_DIR if defined. 

191 

192 The default is now (data_dir)/runtime on all platforms; 

193 we no longer use XDG_RUNTIME_DIR after various problems. 

194 """ 

195 env = os.environ 

196 

197 if env.get("JUPYTER_RUNTIME_DIR"): 

198 return env["JUPYTER_RUNTIME_DIR"] 

199 

200 return pjoin(jupyter_data_dir(), "runtime") 

201 

202 

203if use_platform_dirs(): 

204 SYSTEM_JUPYTER_PATH = platformdirs.site_data_dir( 

205 APPNAME, appauthor=False, multipath=True 

206 ).split(os.pathsep) 

207else: 

208 deprecation( 

209 "Jupyter is migrating its paths to use standard platformdirs\n" 

210 "given by the platformdirs library. To remove this warning and\n" 

211 "see the appropriate new directories, set the environment variable\n" 

212 "`JUPYTER_PLATFORM_DIRS=1` and then run `jupyter --paths`.\n" 

213 "The use of platformdirs will be the default in `jupyter_core` v6" 

214 ) 

215 if os.name == "nt": 

216 programdata = os.environ.get("PROGRAMDATA", None) 

217 if programdata: 

218 SYSTEM_JUPYTER_PATH = [pjoin(programdata, "jupyter")] 

219 else: # PROGRAMDATA is not defined by default on XP. 

220 SYSTEM_JUPYTER_PATH = [os.path.join(sys.prefix, "share", "jupyter")] 

221 else: 

222 SYSTEM_JUPYTER_PATH = [ 

223 "/usr/local/share/jupyter", 

224 "/usr/share/jupyter", 

225 ] 

226 

227ENV_JUPYTER_PATH: List[str] = [os.path.join(sys.prefix, "share", "jupyter")] 

228 

229 

230def jupyter_path(*subdirs: str) -> List[str]: 

231 """Return a list of directories to search for data files 

232 

233 JUPYTER_PATH environment variable has highest priority. 

234 

235 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the environment-level 

236 directories will have priority over user-level directories. 

237 

238 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

239 appropriate Python user site subdirectory to the user-level directories. 

240 

241 

242 If ``*subdirs`` are given, that subdirectory will be added to each element. 

243 

244 Examples: 

245 

246 >>> jupyter_path() 

247 ['~/.local/jupyter', '/usr/local/share/jupyter'] 

248 >>> jupyter_path('kernels') 

249 ['~/.local/jupyter/kernels', '/usr/local/share/jupyter/kernels'] 

250 """ 

251 

252 paths: List[str] = [] 

253 

254 # highest priority is explicit environment variable 

255 if os.environ.get("JUPYTER_PATH"): 

256 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_PATH"].split(os.pathsep)) 

257 

258 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

259 user = [jupyter_data_dir()] 

260 if site.ENABLE_USER_SITE: 

261 # Check if site.getuserbase() exists to be compatible with virtualenv, 

262 # which often does not have this method. 

263 userbase: Optional[str] 

264 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

265 

266 if userbase: 

267 userdir = os.path.join(userbase, "share", "jupyter") 

268 if userdir not in user: 

269 user.append(userdir) 

270 

271 env = [p for p in ENV_JUPYTER_PATH if p not in SYSTEM_JUPYTER_PATH] 

272 

273 if prefer_environment_over_user(): 

274 paths.extend(env) 

275 paths.extend(user) 

276 else: 

277 paths.extend(user) 

278 paths.extend(env) 

279 

280 # finally, system 

281 paths.extend(SYSTEM_JUPYTER_PATH) 

282 

283 # add subdir, if requested 

284 if subdirs: 

285 paths = [pjoin(p, *subdirs) for p in paths] 

286 return paths 

287 

288 

289if use_platform_dirs(): 

290 SYSTEM_CONFIG_PATH = platformdirs.site_config_dir( 

291 APPNAME, appauthor=False, multipath=True 

292 ).split(os.pathsep) 

293else: 

294 if os.name == "nt": # noqa 

295 programdata = os.environ.get("PROGRAMDATA", None) 

296 if programdata: # noqa 

297 SYSTEM_CONFIG_PATH = [os.path.join(programdata, "jupyter")] 

298 else: # PROGRAMDATA is not defined by default on XP. 

299 SYSTEM_CONFIG_PATH = [] 

300 else: 

301 SYSTEM_CONFIG_PATH = [ 

302 "/usr/local/etc/jupyter", 

303 "/etc/jupyter", 

304 ] 

305ENV_CONFIG_PATH: List[str] = [os.path.join(sys.prefix, "etc", "jupyter")] 

306 

307 

308def jupyter_config_path() -> List[str]: 

309 """Return the search path for Jupyter config files as a list. 

310 

311 If the JUPYTER_PREFER_ENV_PATH environment variable is set, the 

312 environment-level directories will have priority over user-level 

313 directories. 

314 

315 If the Python site.ENABLE_USER_SITE variable is True, we also add the 

316 appropriate Python user site subdirectory to the user-level directories. 

317 """ 

318 if os.environ.get("JUPYTER_NO_CONFIG"): 

319 # jupyter_config_dir makes a blank config when JUPYTER_NO_CONFIG is set. 

320 return [jupyter_config_dir()] 

321 

322 paths: List[str] = [] 

323 

324 # highest priority is explicit environment variable 

325 if os.environ.get("JUPYTER_CONFIG_PATH"): 

326 paths.extend(p.rstrip(os.sep) for p in os.environ["JUPYTER_CONFIG_PATH"].split(os.pathsep)) 

327 

328 # Next is environment or user, depending on the JUPYTER_PREFER_ENV_PATH flag 

329 user = [jupyter_config_dir()] 

330 if site.ENABLE_USER_SITE: 

331 userbase: Optional[str] 

332 # Check if site.getuserbase() exists to be compatible with virtualenv, 

333 # which often does not have this method. 

334 userbase = site.getuserbase() if hasattr(site, "getuserbase") else site.USER_BASE 

335 

336 if userbase: 

337 userdir = os.path.join(userbase, "etc", "jupyter") 

338 if userdir not in user: 

339 user.append(userdir) 

340 

341 env = [p for p in ENV_CONFIG_PATH if p not in SYSTEM_CONFIG_PATH] 

342 

343 if prefer_environment_over_user(): 

344 paths.extend(env) 

345 paths.extend(user) 

346 else: 

347 paths.extend(user) 

348 paths.extend(env) 

349 

350 # Finally, system path 

351 paths.extend(SYSTEM_CONFIG_PATH) 

352 return paths 

353 

354 

355def exists(path: str) -> bool: 

356 """Replacement for `os.path.exists` which works for host mapped volumes 

357 on Windows containers 

358 """ 

359 try: 

360 os.lstat(path) 

361 except OSError: 

362 return False 

363 return True 

364 

365 

366def is_file_hidden_win(abs_path: str, stat_res: Optional[Any] = None) -> bool: 

367 """Is a file hidden? 

368 

369 This only checks the file itself; it should be called in combination with 

370 checking the directory containing the file. 

371 

372 Use is_hidden() instead to check the file and its parent directories. 

373 

374 Parameters 

375 ---------- 

376 abs_path : unicode 

377 The absolute path to check. 

378 stat_res : os.stat_result, optional 

379 The result of calling stat() on abs_path. If not passed, this function 

380 will call stat() internally. 

381 """ 

382 if os.path.basename(abs_path).startswith("."): 

383 return True 

384 

385 if stat_res is None: 

386 try: 

387 stat_res = os.stat(abs_path) 

388 except OSError as e: 

389 if e.errno == errno.ENOENT: 

390 return False 

391 raise 

392 

393 try: 

394 if stat_res.st_file_attributes & stat.FILE_ATTRIBUTE_HIDDEN: # type:ignore 

395 return True 

396 except AttributeError: 

397 # allow AttributeError on PyPy for Windows 

398 # 'stat_result' object has no attribute 'st_file_attributes' 

399 # https://foss.heptapod.net/pypy/pypy/-/issues/3469 

400 warnings.warn( 

401 "hidden files are not detectable on this system, so no file will be marked as hidden.", 

402 stacklevel=2, 

403 ) 

404 pass 

405 

406 return False 

407 

408 

409def is_file_hidden_posix(abs_path: str, stat_res: Optional[Any] = None) -> bool: 

410 """Is a file hidden? 

411 

412 This only checks the file itself; it should be called in combination with 

413 checking the directory containing the file. 

414 

415 Use is_hidden() instead to check the file and its parent directories. 

416 

417 Parameters 

418 ---------- 

419 abs_path : unicode 

420 The absolute path to check. 

421 stat_res : os.stat_result, optional 

422 The result of calling stat() on abs_path. If not passed, this function 

423 will call stat() internally. 

424 """ 

425 if os.path.basename(abs_path).startswith("."): 

426 return True 

427 

428 if stat_res is None or stat.S_ISLNK(stat_res.st_mode): 

429 try: 

430 stat_res = os.stat(abs_path) 

431 except OSError as e: 

432 if e.errno == errno.ENOENT: 

433 return False 

434 raise 

435 

436 # check that dirs can be listed 

437 if stat.S_ISDIR(stat_res.st_mode): # type:ignore[misc] # noqa 

438 # use x-access, not actual listing, in case of slow/large listings 

439 if not os.access(abs_path, os.X_OK | os.R_OK): 

440 return True 

441 

442 # check UF_HIDDEN 

443 if getattr(stat_res, "st_flags", 0) & UF_HIDDEN: 

444 return True 

445 

446 return False 

447 

448 

449if sys.platform == "win32": 

450 is_file_hidden = is_file_hidden_win 

451else: 

452 is_file_hidden = is_file_hidden_posix 

453 

454 

455def is_hidden(abs_path: str, abs_root: str = "") -> bool: 

456 """Is a file hidden or contained in a hidden directory? 

457 

458 This will start with the rightmost path element and work backwards to the 

459 given root to see if a path is hidden or in a hidden directory. Hidden is 

460 determined by either name starting with '.' or the UF_HIDDEN flag as 

461 reported by stat. 

462 

463 If abs_path is the same directory as abs_root, it will be visible even if 

464 that is a hidden folder. This only checks the visibility of files 

465 and directories *within* abs_root. 

466 

467 Parameters 

468 ---------- 

469 abs_path : unicode 

470 The absolute path to check for hidden directories. 

471 abs_root : unicode 

472 The absolute path of the root directory in which hidden directories 

473 should be checked for. 

474 """ 

475 abs_path = os.path.normpath(abs_path) 

476 abs_root = os.path.normpath(abs_root) 

477 

478 if abs_path == abs_root: 

479 return False 

480 

481 if is_file_hidden(abs_path): 

482 return True 

483 

484 if not abs_root: 

485 abs_root = abs_path.split(os.sep, 1)[0] + os.sep 

486 inside_root = abs_path[len(abs_root) :] 

487 if any(part.startswith(".") for part in inside_root.split(os.sep)): 

488 return True 

489 

490 # check UF_HIDDEN on any location up to root. 

491 # is_file_hidden() already checked the file, so start from its parent dir 

492 path = os.path.dirname(abs_path) 

493 while path and path.startswith(abs_root) and path != abs_root: 

494 if not exists(path): 

495 path = os.path.dirname(path) 

496 continue 

497 try: 

498 # may fail on Windows junctions 

499 st = os.lstat(path) 

500 except OSError: 

501 return True 

502 if getattr(st, "st_flags", 0) & UF_HIDDEN: 

503 return True 

504 path = os.path.dirname(path) 

505 

506 return False 

507 

508 

509def win32_restrict_file_to_user(fname: str) -> None: 

510 """Secure a windows file to read-only access for the user. 

511 Follows guidance from win32 library creator: 

512 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

513 

514 This method should be executed against an already generated file which 

515 has no secrets written to it yet. 

516 

517 Parameters 

518 ---------- 

519 

520 fname : unicode 

521 The path to the file to secure 

522 """ 

523 try: 

524 import win32api 

525 except ImportError: 

526 return _win32_restrict_file_to_user_ctypes(fname) 

527 

528 import ntsecuritycon as con 

529 import win32security 

530 

531 # everyone, _domain, _type = win32security.LookupAccountName("", "Everyone") 

532 admins = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid) 

533 user, _domain, _type = win32security.LookupAccountName( 

534 "", win32api.GetUserNameEx(win32api.NameSamCompatible) 

535 ) 

536 

537 sd = win32security.GetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION) 

538 

539 dacl = win32security.ACL() 

540 # dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, everyone) 

541 dacl.AddAccessAllowedAce( 

542 win32security.ACL_REVISION, 

543 con.FILE_GENERIC_READ | con.FILE_GENERIC_WRITE | con.DELETE, 

544 user, 

545 ) 

546 dacl.AddAccessAllowedAce(win32security.ACL_REVISION, con.FILE_ALL_ACCESS, admins) 

547 

548 sd.SetSecurityDescriptorDacl(1, dacl, 0) 

549 win32security.SetFileSecurity(fname, win32security.DACL_SECURITY_INFORMATION, sd) 

550 

551 

552def _win32_restrict_file_to_user_ctypes(fname: str) -> None: # noqa 

553 """Secure a windows file to read-only access for the user. 

554 

555 Follows guidance from win32 library creator: 

556 http://timgolden.me.uk/python/win32_how_do_i/add-security-to-a-file.html 

557 

558 This method should be executed against an already generated file which 

559 has no secrets written to it yet. 

560 

561 Parameters 

562 ---------- 

563 

564 fname : unicode 

565 The path to the file to secure 

566 """ 

567 import ctypes 

568 from ctypes import wintypes 

569 

570 advapi32 = ctypes.WinDLL("advapi32", use_last_error=True) # type:ignore[attr-defined] 

571 secur32 = ctypes.WinDLL("secur32", use_last_error=True) # type:ignore[attr-defined] 

572 

573 NameSamCompatible = 2 

574 WinBuiltinAdministratorsSid = 26 

575 DACL_SECURITY_INFORMATION = 4 

576 ACL_REVISION = 2 

577 ERROR_INSUFFICIENT_BUFFER = 122 

578 ERROR_MORE_DATA = 234 

579 

580 SYNCHRONIZE = 0x100000 

581 DELETE = 0x00010000 

582 STANDARD_RIGHTS_REQUIRED = 0xF0000 

583 STANDARD_RIGHTS_READ = 0x20000 

584 STANDARD_RIGHTS_WRITE = 0x20000 

585 FILE_READ_DATA = 1 

586 FILE_READ_EA = 8 

587 FILE_READ_ATTRIBUTES = 128 

588 FILE_WRITE_DATA = 2 

589 FILE_APPEND_DATA = 4 

590 FILE_WRITE_EA = 16 

591 FILE_WRITE_ATTRIBUTES = 256 

592 FILE_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0x1FF 

593 FILE_GENERIC_READ = ( 

594 STANDARD_RIGHTS_READ | FILE_READ_DATA | FILE_READ_ATTRIBUTES | FILE_READ_EA | SYNCHRONIZE 

595 ) 

596 FILE_GENERIC_WRITE = ( 

597 STANDARD_RIGHTS_WRITE 

598 | FILE_WRITE_DATA 

599 | FILE_WRITE_ATTRIBUTES 

600 | FILE_WRITE_EA 

601 | FILE_APPEND_DATA 

602 | SYNCHRONIZE 

603 ) 

604 

605 class ACL(ctypes.Structure): 

606 _fields_ = [ 

607 ("AclRevision", wintypes.BYTE), 

608 ("Sbz1", wintypes.BYTE), 

609 ("AclSize", wintypes.WORD), 

610 ("AceCount", wintypes.WORD), 

611 ("Sbz2", wintypes.WORD), 

612 ] 

613 

614 PSID = ctypes.c_void_p 

615 PACL = ctypes.POINTER(ACL) 

616 PSECURITY_DESCRIPTOR = ctypes.POINTER(wintypes.BYTE) 

617 

618 def _nonzero_success(result, func, args): 

619 if not result: 

620 raise ctypes.WinError(ctypes.get_last_error()) # type:ignore[attr-defined] 

621 return args 

622 

623 secur32.GetUserNameExW.errcheck = _nonzero_success 

624 secur32.GetUserNameExW.restype = wintypes.BOOL 

625 secur32.GetUserNameExW.argtypes = ( 

626 ctypes.c_int, # EXTENDED_NAME_FORMAT NameFormat 

627 wintypes.LPWSTR, # LPWSTR lpNameBuffer, 

628 wintypes.PULONG, # PULONG nSize 

629 ) 

630 

631 advapi32.CreateWellKnownSid.errcheck = _nonzero_success 

632 advapi32.CreateWellKnownSid.restype = wintypes.BOOL 

633 advapi32.CreateWellKnownSid.argtypes = ( 

634 wintypes.DWORD, # WELL_KNOWN_SID_TYPE WellKnownSidType 

635 PSID, # PSID DomainSid 

636 PSID, # PSID pSid 

637 wintypes.PDWORD, # DWORD *cbSid 

638 ) 

639 

640 advapi32.LookupAccountNameW.errcheck = _nonzero_success 

641 advapi32.LookupAccountNameW.restype = wintypes.BOOL 

642 advapi32.LookupAccountNameW.argtypes = ( 

643 wintypes.LPWSTR, # LPCWSTR lpSystemName 

644 wintypes.LPWSTR, # LPCWSTR lpAccountName 

645 PSID, # PSID Sid 

646 wintypes.LPDWORD, # LPDWORD cbSid 

647 wintypes.LPWSTR, # LPCWSTR ReferencedDomainName 

648 wintypes.LPDWORD, # LPDWORD cchReferencedDomainName 

649 wintypes.LPDWORD, # PSID_NAME_USE peUse 

650 ) 

651 

652 advapi32.AddAccessAllowedAce.errcheck = _nonzero_success 

653 advapi32.AddAccessAllowedAce.restype = wintypes.BOOL 

654 advapi32.AddAccessAllowedAce.argtypes = ( 

655 PACL, # PACL pAcl 

656 wintypes.DWORD, # DWORD dwAceRevision 

657 wintypes.DWORD, # DWORD AccessMask 

658 PSID, # PSID pSid 

659 ) 

660 

661 advapi32.SetSecurityDescriptorDacl.errcheck = _nonzero_success 

662 advapi32.SetSecurityDescriptorDacl.restype = wintypes.BOOL 

663 advapi32.SetSecurityDescriptorDacl.argtypes = ( 

664 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

665 wintypes.BOOL, # BOOL bDaclPresent 

666 PACL, # PACL pDacl 

667 wintypes.BOOL, # BOOL bDaclDefaulted 

668 ) 

669 

670 advapi32.GetFileSecurityW.errcheck = _nonzero_success 

671 advapi32.GetFileSecurityW.restype = wintypes.BOOL 

672 advapi32.GetFileSecurityW.argtypes = ( 

673 wintypes.LPCWSTR, # LPCWSTR lpFileName 

674 wintypes.DWORD, # SECURITY_INFORMATION RequestedInformation 

675 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

676 wintypes.DWORD, # DWORD nLength 

677 wintypes.LPDWORD, # LPDWORD lpnLengthNeeded 

678 ) 

679 

680 advapi32.SetFileSecurityW.errcheck = _nonzero_success 

681 advapi32.SetFileSecurityW.restype = wintypes.BOOL 

682 advapi32.SetFileSecurityW.argtypes = ( 

683 wintypes.LPCWSTR, # LPCWSTR lpFileName 

684 wintypes.DWORD, # SECURITY_INFORMATION SecurityInformation 

685 PSECURITY_DESCRIPTOR, # PSECURITY_DESCRIPTOR pSecurityDescriptor 

686 ) 

687 

688 advapi32.MakeAbsoluteSD.errcheck = _nonzero_success 

689 advapi32.MakeAbsoluteSD.restype = wintypes.BOOL 

690 advapi32.MakeAbsoluteSD.argtypes = ( 

691 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

692 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

693 wintypes.LPDWORD, # LPDWORD lpdwAbsoluteSecurityDescriptorSize 

694 PACL, # PACL pDacl 

695 wintypes.LPDWORD, # LPDWORD lpdwDaclSize 

696 PACL, # PACL pSacl 

697 wintypes.LPDWORD, # LPDWORD lpdwSaclSize 

698 PSID, # PSID pOwner 

699 wintypes.LPDWORD, # LPDWORD lpdwOwnerSize 

700 PSID, # PSID pPrimaryGroup 

701 wintypes.LPDWORD, # LPDWORD lpdwPrimaryGroupSize 

702 ) 

703 

704 advapi32.MakeSelfRelativeSD.errcheck = _nonzero_success 

705 advapi32.MakeSelfRelativeSD.restype = wintypes.BOOL 

706 advapi32.MakeSelfRelativeSD.argtypes = ( 

707 PSECURITY_DESCRIPTOR, # pAbsoluteSecurityDescriptor 

708 PSECURITY_DESCRIPTOR, # pSelfRelativeSecurityDescriptor 

709 wintypes.LPDWORD, # LPDWORD lpdwBufferLength 

710 ) 

711 

712 advapi32.InitializeAcl.errcheck = _nonzero_success 

713 advapi32.InitializeAcl.restype = wintypes.BOOL 

714 advapi32.InitializeAcl.argtypes = ( 

715 PACL, # PACL pAcl, 

716 wintypes.DWORD, # DWORD nAclLength, 

717 wintypes.DWORD, # DWORD dwAclRevision 

718 ) 

719 

720 def CreateWellKnownSid(WellKnownSidType): 

721 # return a SID for predefined aliases 

722 pSid = (ctypes.c_char * 1)() 

723 cbSid = wintypes.DWORD() 

724 try: 

725 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

726 except OSError as e: 

727 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

728 raise 

729 pSid = (ctypes.c_char * cbSid.value)() 

730 advapi32.CreateWellKnownSid(WellKnownSidType, None, pSid, ctypes.byref(cbSid)) 

731 return pSid[:] 

732 

733 def GetUserNameEx(NameFormat): 

734 # return the user or other security principal associated with 

735 # the calling thread 

736 nSize = ctypes.pointer(ctypes.c_ulong(0)) 

737 try: 

738 secur32.GetUserNameExW(NameFormat, None, nSize) 

739 except OSError as e: 

740 if e.winerror != ERROR_MORE_DATA: # type:ignore[attr-defined] 

741 raise 

742 if not nSize.contents.value: 

743 return None 

744 lpNameBuffer = ctypes.create_unicode_buffer(nSize.contents.value) 

745 secur32.GetUserNameExW(NameFormat, lpNameBuffer, nSize) 

746 return lpNameBuffer.value 

747 

748 def LookupAccountName(lpSystemName, lpAccountName): 

749 # return a security identifier (SID) for an account on a system 

750 # and the name of the domain on which the account was found 

751 cbSid = wintypes.DWORD(0) 

752 cchReferencedDomainName = wintypes.DWORD(0) 

753 peUse = wintypes.DWORD(0) 

754 try: 

755 advapi32.LookupAccountNameW( 

756 lpSystemName, 

757 lpAccountName, 

758 None, 

759 ctypes.byref(cbSid), 

760 None, 

761 ctypes.byref(cchReferencedDomainName), 

762 ctypes.byref(peUse), 

763 ) 

764 except OSError as e: 

765 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

766 raise 

767 Sid = ctypes.create_unicode_buffer("", cbSid.value) 

768 pSid = ctypes.cast(ctypes.pointer(Sid), wintypes.LPVOID) 

769 lpReferencedDomainName = ctypes.create_unicode_buffer("", cchReferencedDomainName.value + 1) 

770 success = advapi32.LookupAccountNameW( 

771 lpSystemName, 

772 lpAccountName, 

773 pSid, 

774 ctypes.byref(cbSid), 

775 lpReferencedDomainName, 

776 ctypes.byref(cchReferencedDomainName), 

777 ctypes.byref(peUse), 

778 ) 

779 if not success: 

780 raise ctypes.WinError() # type:ignore[attr-defined] 

781 return pSid, lpReferencedDomainName.value, peUse.value 

782 

783 def AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid): 

784 # add an access-allowed access control entry (ACE) 

785 # to an access control list (ACL) 

786 advapi32.AddAccessAllowedAce(pAcl, dwAceRevision, AccessMask, pSid) 

787 

788 def GetFileSecurity(lpFileName, RequestedInformation): 

789 # return information about the security of a file or directory 

790 nLength = wintypes.DWORD(0) 

791 try: 

792 advapi32.GetFileSecurityW( 

793 lpFileName, 

794 RequestedInformation, 

795 None, 

796 0, 

797 ctypes.byref(nLength), 

798 ) 

799 except OSError as e: 

800 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

801 raise 

802 if not nLength.value: 

803 return None 

804 pSecurityDescriptor = (wintypes.BYTE * nLength.value)() 

805 advapi32.GetFileSecurityW( 

806 lpFileName, 

807 RequestedInformation, 

808 pSecurityDescriptor, 

809 nLength, 

810 ctypes.byref(nLength), 

811 ) 

812 return pSecurityDescriptor 

813 

814 def SetFileSecurity(lpFileName, RequestedInformation, pSecurityDescriptor): 

815 # set the security of a file or directory object 

816 advapi32.SetFileSecurityW(lpFileName, RequestedInformation, pSecurityDescriptor) 

817 

818 def SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted): 

819 # set information in a discretionary access control list (DACL) 

820 advapi32.SetSecurityDescriptorDacl(pSecurityDescriptor, bDaclPresent, pDacl, bDaclDefaulted) 

821 

822 def MakeAbsoluteSD(pSelfRelativeSecurityDescriptor): 

823 # return a security descriptor in absolute format 

824 # by using a security descriptor in self-relative format as a template 

825 pAbsoluteSecurityDescriptor = None 

826 lpdwAbsoluteSecurityDescriptorSize = wintypes.DWORD(0) 

827 pDacl = None 

828 lpdwDaclSize = wintypes.DWORD(0) 

829 pSacl = None 

830 lpdwSaclSize = wintypes.DWORD(0) 

831 pOwner = None 

832 lpdwOwnerSize = wintypes.DWORD(0) 

833 pPrimaryGroup = None 

834 lpdwPrimaryGroupSize = wintypes.DWORD(0) 

835 try: 

836 advapi32.MakeAbsoluteSD( 

837 pSelfRelativeSecurityDescriptor, 

838 pAbsoluteSecurityDescriptor, 

839 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

840 pDacl, 

841 ctypes.byref(lpdwDaclSize), 

842 pSacl, 

843 ctypes.byref(lpdwSaclSize), 

844 pOwner, 

845 ctypes.byref(lpdwOwnerSize), 

846 pPrimaryGroup, 

847 ctypes.byref(lpdwPrimaryGroupSize), 

848 ) 

849 except OSError as e: 

850 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

851 raise 

852 pAbsoluteSecurityDescriptor = (wintypes.BYTE * lpdwAbsoluteSecurityDescriptorSize.value)() 

853 pDaclData = (wintypes.BYTE * lpdwDaclSize.value)() 

854 pDacl = ctypes.cast(pDaclData, PACL).contents 

855 pSaclData = (wintypes.BYTE * lpdwSaclSize.value)() 

856 pSacl = ctypes.cast(pSaclData, PACL).contents 

857 pOwnerData = (wintypes.BYTE * lpdwOwnerSize.value)() 

858 pOwner = ctypes.cast(pOwnerData, PSID) 

859 pPrimaryGroupData = (wintypes.BYTE * lpdwPrimaryGroupSize.value)() 

860 pPrimaryGroup = ctypes.cast(pPrimaryGroupData, PSID) 

861 advapi32.MakeAbsoluteSD( 

862 pSelfRelativeSecurityDescriptor, 

863 pAbsoluteSecurityDescriptor, 

864 ctypes.byref(lpdwAbsoluteSecurityDescriptorSize), 

865 pDacl, 

866 ctypes.byref(lpdwDaclSize), 

867 pSacl, 

868 ctypes.byref(lpdwSaclSize), 

869 pOwner, 

870 lpdwOwnerSize, 

871 pPrimaryGroup, 

872 ctypes.byref(lpdwPrimaryGroupSize), 

873 ) 

874 return pAbsoluteSecurityDescriptor 

875 

876 def MakeSelfRelativeSD(pAbsoluteSecurityDescriptor): 

877 # return a security descriptor in self-relative format 

878 # by using a security descriptor in absolute format as a template 

879 pSelfRelativeSecurityDescriptor = None 

880 lpdwBufferLength = wintypes.DWORD(0) 

881 try: 

882 advapi32.MakeSelfRelativeSD( 

883 pAbsoluteSecurityDescriptor, 

884 pSelfRelativeSecurityDescriptor, 

885 ctypes.byref(lpdwBufferLength), 

886 ) 

887 except OSError as e: 

888 if e.winerror != ERROR_INSUFFICIENT_BUFFER: # type:ignore[attr-defined] 

889 raise 

890 pSelfRelativeSecurityDescriptor = (wintypes.BYTE * lpdwBufferLength.value)() 

891 advapi32.MakeSelfRelativeSD( 

892 pAbsoluteSecurityDescriptor, 

893 pSelfRelativeSecurityDescriptor, 

894 ctypes.byref(lpdwBufferLength), 

895 ) 

896 return pSelfRelativeSecurityDescriptor 

897 

898 def NewAcl(): 

899 # return a new, initialized ACL (access control list) structure 

900 nAclLength = 32767 # TODO: calculate this: ctypes.sizeof(ACL) + ? 

901 acl_data = ctypes.create_string_buffer(nAclLength) 

902 pAcl = ctypes.cast(acl_data, PACL).contents 

903 advapi32.InitializeAcl(pAcl, nAclLength, ACL_REVISION) 

904 return pAcl 

905 

906 SidAdmins = CreateWellKnownSid(WinBuiltinAdministratorsSid) 

907 SidUser = LookupAccountName("", GetUserNameEx(NameSamCompatible))[0] 

908 

909 Acl = NewAcl() 

910 AddAccessAllowedAce(Acl, ACL_REVISION, FILE_ALL_ACCESS, SidAdmins) 

911 AddAccessAllowedAce( 

912 Acl, 

913 ACL_REVISION, 

914 FILE_GENERIC_READ | FILE_GENERIC_WRITE | DELETE, 

915 SidUser, 

916 ) 

917 

918 SelfRelativeSD = GetFileSecurity(fname, DACL_SECURITY_INFORMATION) 

919 AbsoluteSD = MakeAbsoluteSD(SelfRelativeSD) 

920 SetSecurityDescriptorDacl(AbsoluteSD, 1, Acl, 0) 

921 SelfRelativeSD = MakeSelfRelativeSD(AbsoluteSD) 

922 

923 SetFileSecurity(fname, DACL_SECURITY_INFORMATION, SelfRelativeSD) 

924 

925 

926def get_file_mode(fname: str) -> int: 

927 """Retrieves the file mode corresponding to fname in a filesystem-tolerant manner. 

928 

929 Parameters 

930 ---------- 

931 

932 fname : unicode 

933 The path to the file to get mode from 

934 

935 """ 

936 # Some filesystems (e.g., CIFS) auto-enable the execute bit on files. As a result, we 

937 # should tolerate the execute bit on the file's owner when validating permissions - thus 

938 # the missing least significant bit on the third octal digit. In addition, we also tolerate 

939 # the sticky bit being set, so the lsb from the fourth octal digit is also removed. 

940 return ( 

941 stat.S_IMODE(os.stat(fname).st_mode) & 0o6677 

942 ) # Use 4 octal digits since S_IMODE does the same 

943 

944 

945allow_insecure_writes = os.getenv("JUPYTER_ALLOW_INSECURE_WRITES", "false").lower() in ("true", "1") 

946 

947 

948@contextmanager 

949def secure_write(fname: str, binary: bool = False) -> Iterator[Any]: 

950 """Opens a file in the most restricted pattern available for 

951 writing content. This limits the file mode to `0o0600` and yields 

952 the resulting opened filed handle. 

953 

954 Parameters 

955 ---------- 

956 

957 fname : unicode 

958 The path to the file to write 

959 

960 binary: boolean 

961 Indicates that the file is binary 

962 """ 

963 mode = "wb" if binary else "w" 

964 encoding = None if binary else "utf-8" 

965 open_flag = os.O_CREAT | os.O_WRONLY | os.O_TRUNC 

966 try: 

967 os.remove(fname) 

968 except OSError: 

969 # Skip any issues with the file not existing 

970 pass 

971 

972 if os.name == "nt": 

973 if allow_insecure_writes: 

974 # Mounted file systems can have a number of failure modes inside this block. 

975 # For windows machines in insecure mode we simply skip this to avoid failures :/ 

976 issue_insecure_write_warning() 

977 else: 

978 # Python on windows does not respect the group and public bits for chmod, so we need 

979 # to take additional steps to secure the contents. 

980 # Touch file pre-emptively to avoid editing permissions in open files in Windows 

981 fd = os.open(fname, open_flag, 0o0600) 

982 os.close(fd) 

983 open_flag = os.O_WRONLY | os.O_TRUNC 

984 win32_restrict_file_to_user(fname) 

985 

986 with os.fdopen(os.open(fname, open_flag, 0o0600), mode, encoding=encoding) as f: 

987 if os.name != "nt": 

988 # Enforce that the file got the requested permissions before writing 

989 file_mode = get_file_mode(fname) 

990 if file_mode != 0o0600: # noqa 

991 if allow_insecure_writes: 

992 issue_insecure_write_warning() 

993 else: 

994 msg = ( 

995 "Permissions assignment failed for secure file: '{file}'." 

996 " Got '{permissions}' instead of '0o0600'.".format( 

997 file=fname, permissions=oct(file_mode) 

998 ) 

999 ) 

1000 raise RuntimeError(msg) 

1001 yield f 

1002 

1003 

1004def issue_insecure_write_warning() -> None: 

1005 """Issue an insecure write warning.""" 

1006 

1007 def format_warning(msg, *args, **kwargs): 

1008 return str(msg) + "\n" 

1009 

1010 warnings.formatwarning = format_warning # type:ignore[assignment] 

1011 warnings.warn( 

1012 "WARNING: Insecure writes have been enabled via environment variable " 

1013 "'JUPYTER_ALLOW_INSECURE_WRITES'! If this is not intended, remove the " 

1014 "variable or set its value to 'False'.", 

1015 stacklevel=2, 

1016 )