Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/config.py: 72%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

449 statements  

1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

2# 

3# This module is part of GitPython and is released under the 

4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6"""Parser for reading and writing configuration files.""" 

7 

8__all__ = ["GitConfigParser", "SectionConstraint"] 

9 

10import abc 

11import configparser as cp 

12import fnmatch 

13from functools import wraps 

14import inspect 

15from io import BufferedReader, IOBase 

16import logging 

17import os 

18import os.path as osp 

19import re 

20import sys 

21 

22from git.compat import defenc, force_text 

23from git.util import LockFile 

24 

25# typing------------------------------------------------------- 

26 

27from typing import ( 

28 Any, 

29 Callable, 

30 Generic, 

31 IO, 

32 List, 

33 Dict, 

34 Sequence, 

35 TYPE_CHECKING, 

36 Tuple, 

37 TypeVar, 

38 Union, 

39 cast, 

40) 

41 

42from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, assert_never, _T 

43 

44if TYPE_CHECKING: 

45 from io import BytesIO 

46 

47 from git.repo.base import Repo 

48 

49T_ConfigParser = TypeVar("T_ConfigParser", bound="GitConfigParser") 

50T_OMD_value = TypeVar("T_OMD_value", str, bytes, int, float, bool) 

51 

52if sys.version_info[:3] < (3, 7, 2): 

53 # typing.Ordereddict not added until Python 3.7.2. 

54 from collections import OrderedDict 

55 

56 OrderedDict_OMD = OrderedDict 

57else: 

58 from typing import OrderedDict 

59 

60 OrderedDict_OMD = OrderedDict[str, List[T_OMD_value]] # type: ignore[assignment, misc] 

61 

62# ------------------------------------------------------------- 

63 

64_logger = logging.getLogger(__name__) 

65 

66CONFIG_LEVELS: ConfigLevels_Tup = ("system", "user", "global", "repository") 

67"""The configuration level of a configuration file.""" 

68 

69CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbranch|hasconfig:remote\.\*\.url):(.+)\"") 

70"""Section pattern to detect conditional includes. 

71 

72See: https://git-scm.com/docs/git-config#_conditional_includes 

73""" 

74 

75 

76class MetaParserBuilder(abc.ABCMeta): # noqa: B024 

77 """Utility class wrapping base-class methods into decorators that assure read-only 

78 properties.""" 

79 

80 def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> "MetaParserBuilder": 

81 """Equip all base-class methods with a needs_values decorator, and all non-const 

82 methods with a :func:`set_dirty_and_flush_changes` decorator in addition to 

83 that. 

84 """ 

85 kmm = "_mutating_methods_" 

86 if kmm in clsdict: 

87 mutating_methods = clsdict[kmm] 

88 for base in bases: 

89 methods = (t for t in inspect.getmembers(base, inspect.isroutine) if not t[0].startswith("_")) 

90 for method_name, method in methods: 

91 if method_name in clsdict: 

92 continue 

93 method_with_values = needs_values(method) 

94 if method_name in mutating_methods: 

95 method_with_values = set_dirty_and_flush_changes(method_with_values) 

96 # END mutating methods handling 

97 

98 clsdict[method_name] = method_with_values 

99 # END for each name/method pair 

100 # END for each base 

101 # END if mutating methods configuration is set 

102 

103 new_type = super().__new__(cls, name, bases, clsdict) 

104 return new_type 

105 

106 

107def needs_values(func: Callable[..., _T]) -> Callable[..., _T]: 

108 """Return a method for ensuring we read values (on demand) before we try to access 

109 them.""" 

110 

111 @wraps(func) 

112 def assure_data_present(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: 

113 self.read() 

114 return func(self, *args, **kwargs) 

115 

116 # END wrapper method 

117 return assure_data_present 

118 

119 

120def set_dirty_and_flush_changes(non_const_func: Callable[..., _T]) -> Callable[..., _T]: 

121 """Return a method that checks whether given non constant function may be called. 

122 

123 If so, the instance will be set dirty. Additionally, we flush the changes right to 

124 disk. 

125 """ 

126 

127 def flush_changes(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: 

128 rval = non_const_func(self, *args, **kwargs) 

129 self._dirty = True 

130 self.write() 

131 return rval 

132 

133 # END wrapper method 

134 flush_changes.__name__ = non_const_func.__name__ 

135 return flush_changes 

136 

137 

138class SectionConstraint(Generic[T_ConfigParser]): 

139 """Constrains a ConfigParser to only option commands which are constrained to 

140 always use the section we have been initialized with. 

141 

142 It supports all ConfigParser methods that operate on an option. 

143 

144 :note: 

145 If used as a context manager, will release the wrapped ConfigParser. 

146 """ 

147 

148 __slots__ = ("_config", "_section_name") 

149 

150 _valid_attrs_ = ( 

151 "get_value", 

152 "set_value", 

153 "get", 

154 "set", 

155 "getint", 

156 "getfloat", 

157 "getboolean", 

158 "has_option", 

159 "remove_section", 

160 "remove_option", 

161 "options", 

162 ) 

163 

164 def __init__(self, config: T_ConfigParser, section: str) -> None: 

165 self._config = config 

166 self._section_name = section 

167 

168 def __del__(self) -> None: 

169 # Yes, for some reason, we have to call it explicitly for it to work in PY3 ! 

170 # Apparently __del__ doesn't get call anymore if refcount becomes 0 

171 # Ridiculous ... . 

172 self._config.release() 

173 

174 def __getattr__(self, attr: str) -> Any: 

175 if attr in self._valid_attrs_: 

176 return lambda *args, **kwargs: self._call_config(attr, *args, **kwargs) 

177 return super().__getattribute__(attr) 

178 

179 def _call_config(self, method: str, *args: Any, **kwargs: Any) -> Any: 

180 """Call the configuration at the given method which must take a section name as 

181 first argument.""" 

182 return getattr(self._config, method)(self._section_name, *args, **kwargs) 

183 

184 @property 

185 def config(self) -> T_ConfigParser: 

186 """return: ConfigParser instance we constrain""" 

187 return self._config 

188 

189 def release(self) -> None: 

190 """Equivalent to :meth:`GitConfigParser.release`, which is called on our 

191 underlying parser instance.""" 

192 return self._config.release() 

193 

194 def __enter__(self) -> "SectionConstraint[T_ConfigParser]": 

195 self._config.__enter__() 

196 return self 

197 

198 def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None: 

199 self._config.__exit__(exception_type, exception_value, traceback) 

200 

201 

202class _OMD(OrderedDict_OMD): 

203 """Ordered multi-dict.""" 

204 

205 def __setitem__(self, key: str, value: _T) -> None: 

206 super().__setitem__(key, [value]) 

207 

208 def add(self, key: str, value: Any) -> None: 

209 if key not in self: 

210 super().__setitem__(key, [value]) 

211 return 

212 

213 super().__getitem__(key).append(value) 

214 

215 def setall(self, key: str, values: List[_T]) -> None: 

216 super().__setitem__(key, values) 

217 

218 def __getitem__(self, key: str) -> Any: 

219 return super().__getitem__(key)[-1] 

220 

221 def getlast(self, key: str) -> Any: 

222 return super().__getitem__(key)[-1] 

223 

224 def setlast(self, key: str, value: Any) -> None: 

225 if key not in self: 

226 super().__setitem__(key, [value]) 

227 return 

228 

229 prior = super().__getitem__(key) 

230 prior[-1] = value 

231 

232 def get(self, key: str, default: Union[_T, None] = None) -> Union[_T, None]: 

233 return super().get(key, [default])[-1] 

234 

235 def getall(self, key: str) -> List[_T]: 

236 return super().__getitem__(key) 

237 

238 def items(self) -> List[Tuple[str, _T]]: # type: ignore[override] 

239 """List of (key, last value for key).""" 

240 return [(k, self[k]) for k in self] 

241 

242 def items_all(self) -> List[Tuple[str, List[_T]]]: 

243 """List of (key, list of values for key).""" 

244 return [(k, self.getall(k)) for k in self] 

245 

246 

247def get_config_path(config_level: Lit_config_levels) -> str: 

248 # We do not support an absolute path of the gitconfig on Windows. 

249 # Use the global config instead. 

250 if sys.platform == "win32" and config_level == "system": 

251 config_level = "global" 

252 

253 if config_level == "system": 

254 return "/etc/gitconfig" 

255 elif config_level == "user": 

256 config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config") 

257 return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config"))) 

258 elif config_level == "global": 

259 return osp.normpath(osp.expanduser("~/.gitconfig")) 

260 elif config_level == "repository": 

261 raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path") 

262 else: 

263 # Should not reach here. Will raise ValueError if does. Static typing will warn 

264 # about missing elifs. 

265 assert_never( # type: ignore[unreachable] 

266 config_level, 

267 ValueError(f"Invalid configuration level: {config_level!r}"), 

268 ) 

269 

270 

271class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): 

272 """Implements specifics required to read git style configuration files. 

273 

274 This variation behaves much like the :manpage:`git-config(1)` command, such that the 

275 configuration will be read on demand based on the filepath given during 

276 initialization. 

277 

278 The changes will automatically be written once the instance goes out of scope, but 

279 can be triggered manually as well. 

280 

281 The configuration file will be locked if you intend to change values preventing 

282 other instances to write concurrently. 

283 

284 :note: 

285 The config is case-sensitive even when queried, hence section and option names 

286 must match perfectly. 

287 

288 :note: 

289 If used as a context manager, this will release the locked file. 

290 """ 

291 

292 # { Configuration 

293 t_lock = LockFile 

294 """The lock type determines the type of lock to use in new configuration readers. 

295 

296 They must be compatible to the :class:`~git.util.LockFile` interface. 

297 A suitable alternative would be the :class:`~git.util.BlockingLockFile`. 

298 """ 

299 

300 re_comment = re.compile(r"^\s*[#;]") 

301 # } END configuration 

302 

303 optvalueonly_source = r"\s*(?P<option>[^:=\s][^:=]*)" 

304 

305 OPTVALUEONLY = re.compile(optvalueonly_source) 

306 

307 OPTCRE = re.compile(optvalueonly_source + r"\s*(?P<vi>[:=])\s*" + r"(?P<value>.*)$") 

308 

309 del optvalueonly_source 

310 

311 _mutating_methods_ = ("add_section", "remove_section", "remove_option", "set") 

312 """Names of :class:`~configparser.RawConfigParser` methods able to change the 

313 instance.""" 

314 

315 def __init__( 

316 self, 

317 file_or_files: Union[None, PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = None, 

318 read_only: bool = True, 

319 merge_includes: bool = True, 

320 config_level: Union[Lit_config_levels, None] = None, 

321 repo: Union["Repo", None] = None, 

322 ) -> None: 

323 """Initialize a configuration reader to read the given `file_or_files` and to 

324 possibly allow changes to it by setting `read_only` False. 

325 

326 :param file_or_files: 

327 A file path or file object, or a sequence of possibly more than one of them. 

328 

329 :param read_only: 

330 If ``True``, the ConfigParser may only read the data, but not change it. 

331 If ``False``, only a single file path or file object may be given. We will 

332 write back the changes when they happen, or when the ConfigParser is 

333 released. This will not happen if other configuration files have been 

334 included. 

335 

336 :param merge_includes: 

337 If ``True``, we will read files mentioned in ``[include]`` sections and 

338 merge their contents into ours. This makes it impossible to write back an 

339 individual configuration file. Thus, if you want to modify a single 

340 configuration file, turn this off to leave the original dataset unaltered 

341 when reading it. 

342 

343 :param repo: 

344 Reference to repository to use if ``[includeIf]`` sections are found in 

345 configuration files. 

346 """ 

347 cp.RawConfigParser.__init__(self, dict_type=_OMD) 

348 self._dict: Callable[..., _OMD] 

349 self._defaults: _OMD 

350 self._sections: _OMD 

351 

352 # Used in Python 3. Needs to stay in sync with sections for underlying 

353 # implementation to work. 

354 if not hasattr(self, "_proxies"): 

355 self._proxies = self._dict() 

356 

357 if file_or_files is not None: 

358 self._file_or_files: Union[PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = file_or_files 

359 else: 

360 if config_level is None: 

361 if read_only: 

362 self._file_or_files = [ 

363 get_config_path(cast(Lit_config_levels, f)) for f in CONFIG_LEVELS if f != "repository" 

364 ] 

365 else: 

366 raise ValueError("No configuration level or configuration files specified") 

367 else: 

368 self._file_or_files = [get_config_path(config_level)] 

369 

370 self._read_only = read_only 

371 self._dirty = False 

372 self._is_initialized = False 

373 self._merge_includes = merge_includes 

374 self._repo = repo 

375 self._lock: Union["LockFile", None] = None 

376 self._acquire_lock() 

377 

378 def _acquire_lock(self) -> None: 

379 if not self._read_only: 

380 if not self._lock: 

381 if isinstance(self._file_or_files, (str, os.PathLike)): 

382 file_or_files = self._file_or_files 

383 elif isinstance(self._file_or_files, (tuple, list, Sequence)): 

384 raise ValueError( 

385 "Write-ConfigParsers can operate on a single file only, multiple files have been passed" 

386 ) 

387 else: 

388 file_or_files = self._file_or_files.name 

389 

390 # END get filename from handle/stream 

391 # Initialize lock base - we want to write. 

392 self._lock = self.t_lock(file_or_files) 

393 # END lock check 

394 

395 self._lock._obtain_lock() 

396 # END read-only check 

397 

398 def __del__(self) -> None: 

399 """Write pending changes if required and release locks.""" 

400 # NOTE: Only consistent in Python 2. 

401 self.release() 

402 

403 def __enter__(self) -> "GitConfigParser": 

404 self._acquire_lock() 

405 return self 

406 

407 def __exit__(self, *args: Any) -> None: 

408 self.release() 

409 

410 def release(self) -> None: 

411 """Flush changes and release the configuration write lock. This instance must 

412 not be used anymore afterwards. 

413 

414 In Python 3, it's required to explicitly release locks and flush changes, as 

415 ``__del__`` is not called deterministically anymore. 

416 """ 

417 # Checking for the lock here makes sure we do not raise during write() 

418 # in case an invalid parser was created who could not get a lock. 

419 if self.read_only or (self._lock and not self._lock._has_lock()): 

420 return 

421 

422 try: 

423 self.write() 

424 except IOError: 

425 _logger.error("Exception during destruction of GitConfigParser", exc_info=True) 

426 except ReferenceError: 

427 # This happens in Python 3... and usually means that some state cannot be 

428 # written as the sections dict cannot be iterated. This usually happens when 

429 # the interpreter is shutting down. Can it be fixed? 

430 pass 

431 finally: 

432 if self._lock is not None: 

433 self._lock._release_lock() 

434 

435 def optionxform(self, optionstr: str) -> str: 

436 """Do not transform options in any way when writing.""" 

437 return optionstr 

438 

439 def _read(self, fp: Union[BufferedReader, IO[bytes]], fpname: str) -> None: 

440 """Originally a direct copy of the Python 2.4 version of 

441 :meth:`RawConfigParser._read <configparser.RawConfigParser._read>`, to ensure it 

442 uses ordered dicts. 

443 

444 The ordering bug was fixed in Python 2.4, and dict itself keeps ordering since 

445 Python 3.7. This has some other changes, especially that it ignores initial 

446 whitespace, since git uses tabs. (Big comments are removed to be more compact.) 

447 """ 

448 cursect = None # None, or a dictionary. 

449 optname = None 

450 lineno = 0 

451 is_multi_line = False 

452 e = None # None, or an exception. 

453 

454 def string_decode(v: str) -> str: 

455 if v and v.endswith("\\"): 

456 v = v[:-1] 

457 # END cut trailing escapes to prevent decode error 

458 

459 return v.encode(defenc).decode("unicode_escape") 

460 

461 # END string_decode 

462 

463 while True: 

464 # We assume to read binary! 

465 line = fp.readline().decode(defenc) 

466 if not line: 

467 break 

468 lineno = lineno + 1 

469 # Comment or blank line? 

470 if line.strip() == "" or self.re_comment.match(line): 

471 continue 

472 if line.split(None, 1)[0].lower() == "rem" and line[0] in "rR": 

473 # No leading whitespace. 

474 continue 

475 

476 # Is it a section header? 

477 mo = self.SECTCRE.match(line.strip()) 

478 if not is_multi_line and mo: 

479 sectname: str = mo.group("header").strip() 

480 if sectname in self._sections: 

481 cursect = self._sections[sectname] 

482 elif sectname == cp.DEFAULTSECT: 

483 cursect = self._defaults 

484 else: 

485 cursect = self._dict((("__name__", sectname),)) 

486 self._sections[sectname] = cursect 

487 self._proxies[sectname] = None 

488 # So sections can't start with a continuation line. 

489 optname = None 

490 # No section header in the file? 

491 elif cursect is None: 

492 raise cp.MissingSectionHeaderError(fpname, lineno, line) 

493 # An option line? 

494 elif not is_multi_line: 

495 mo = self.OPTCRE.match(line) 

496 if mo: 

497 # We might just have handled the last line, which could contain a quotation we want to remove. 

498 optname, vi, optval = mo.group("option", "vi", "value") 

499 optname = self.optionxform(optname.rstrip()) 

500 

501 if vi in ("=", ":") and ";" in optval and not optval.strip().startswith('"'): 

502 pos = optval.find(";") 

503 if pos != -1 and optval[pos - 1].isspace(): 

504 optval = optval[:pos] 

505 optval = optval.strip() 

506 

507 if len(optval) < 2 or optval[0] != '"': 

508 # Does not open quoting. 

509 pass 

510 elif optval[-1] != '"': 

511 # Opens quoting and does not close: appears to start multi-line quoting. 

512 is_multi_line = True 

513 optval = string_decode(optval[1:]) 

514 elif optval.find("\\", 1, -1) == -1 and optval.find('"', 1, -1) == -1: 

515 # Opens and closes quoting. Single line, and all we need is quote removal. 

516 optval = optval[1:-1] 

517 # TODO: Handle other quoted content, especially well-formed backslash escapes. 

518 

519 # Preserves multiple values for duplicate optnames. 

520 cursect.add(optname, optval) 

521 else: 

522 # Check if it's an option with no value - it's just ignored by git. 

523 if not self.OPTVALUEONLY.match(line): 

524 if not e: 

525 e = cp.ParsingError(fpname) 

526 e.append(lineno, repr(line)) 

527 continue 

528 else: 

529 line = line.rstrip() 

530 if line.endswith('"'): 

531 is_multi_line = False 

532 line = line[:-1] 

533 # END handle quotations 

534 optval = cursect.getlast(optname) 

535 cursect.setlast(optname, optval + string_decode(line)) 

536 # END parse section or option 

537 # END while reading 

538 

539 # If any parsing errors occurred, raise an exception. 

540 if e: 

541 raise e 

542 

543 def _has_includes(self) -> Union[bool, int]: 

544 return self._merge_includes and len(self._included_paths()) 

545 

546 def _included_paths(self) -> List[Tuple[str, str]]: 

547 """List all paths that must be included to configuration. 

548 

549 :return: 

550 The list of paths, where each path is a tuple of (option, value). 

551 """ 

552 paths = [] 

553 

554 for section in self.sections(): 

555 if section == "include": 

556 paths += self.items(section) 

557 

558 match = CONDITIONAL_INCLUDE_REGEXP.search(section) 

559 if match is None or self._repo is None: 

560 continue 

561 

562 keyword = match.group(1) 

563 value = match.group(2).strip() 

564 

565 if keyword in ["gitdir", "gitdir/i"]: 

566 value = osp.expanduser(value) 

567 

568 if not any(value.startswith(s) for s in ["./", "/"]): 

569 value = "**/" + value 

570 if value.endswith("/"): 

571 value += "**" 

572 

573 # Ensure that glob is always case insensitive if required. 

574 if keyword.endswith("/i"): 

575 value = re.sub( 

576 r"[a-zA-Z]", 

577 lambda m: f"[{m.group().lower()!r}{m.group().upper()!r}]", 

578 value, 

579 ) 

580 if self._repo.git_dir: 

581 if fnmatch.fnmatchcase(str(self._repo.git_dir), value): 

582 paths += self.items(section) 

583 

584 elif keyword == "onbranch": 

585 try: 

586 branch_name = self._repo.active_branch.name 

587 except TypeError: 

588 # Ignore section if active branch cannot be retrieved. 

589 continue 

590 

591 if fnmatch.fnmatchcase(branch_name, value): 

592 paths += self.items(section) 

593 elif keyword == "hasconfig:remote.*.url": 

594 for remote in self._repo.remotes: 

595 if fnmatch.fnmatchcase(remote.url, value): 

596 paths += self.items(section) 

597 break 

598 return paths 

599 

600 def read(self) -> None: # type: ignore[override] 

601 """Read the data stored in the files we have been initialized with. 

602 

603 This will ignore files that cannot be read, possibly leaving an empty 

604 configuration. 

605 

606 :raise IOError: 

607 If a file cannot be handled. 

608 """ 

609 if self._is_initialized: 

610 return 

611 self._is_initialized = True 

612 

613 files_to_read: List[Union[PathLike, IO]] = [""] 

614 if isinstance(self._file_or_files, (str, os.PathLike)): 

615 # For str or Path, as str is a type of Sequence. 

616 files_to_read = [self._file_or_files] 

617 elif not isinstance(self._file_or_files, (tuple, list, Sequence)): 

618 # Could merge with above isinstance once runtime type known. 

619 files_to_read = [self._file_or_files] 

620 else: # For lists or tuples. 

621 files_to_read = list(self._file_or_files) 

622 # END ensure we have a copy of the paths to handle 

623 

624 seen = set(files_to_read) 

625 num_read_include_files = 0 

626 while files_to_read: 

627 file_path = files_to_read.pop(0) 

628 file_ok = False 

629 

630 if hasattr(file_path, "seek"): 

631 # Must be a file-object. 

632 # TODO: Replace cast with assert to narrow type, once sure. 

633 file_path = cast(IO[bytes], file_path) 

634 self._read(file_path, file_path.name) 

635 else: 

636 try: 

637 with open(file_path, "rb") as fp: 

638 file_ok = True 

639 self._read(fp, fp.name) 

640 except IOError: 

641 continue 

642 

643 # Read includes and append those that we didn't handle yet. We expect all 

644 # paths to be normalized and absolute (and will ensure that is the case). 

645 if self._has_includes(): 

646 for _, include_path in self._included_paths(): 

647 if include_path.startswith("~"): 

648 include_path = osp.expanduser(include_path) 

649 if not osp.isabs(include_path): 

650 if not file_ok: 

651 continue 

652 # END ignore relative paths if we don't know the configuration file path 

653 file_path = cast(PathLike, file_path) 

654 assert osp.isabs(file_path), "Need absolute paths to be sure our cycle checks will work" 

655 include_path = osp.join(osp.dirname(file_path), include_path) 

656 # END make include path absolute 

657 include_path = osp.normpath(include_path) 

658 if include_path in seen or not os.access(include_path, os.R_OK): 

659 continue 

660 seen.add(include_path) 

661 # Insert included file to the top to be considered first. 

662 files_to_read.insert(0, include_path) 

663 num_read_include_files += 1 

664 # END each include path in configuration file 

665 # END handle includes 

666 # END for each file object to read 

667 

668 # If there was no file included, we can safely write back (potentially) the 

669 # configuration file without altering its meaning. 

670 if num_read_include_files == 0: 

671 self._merge_includes = False 

672 

673 def _write(self, fp: IO) -> None: 

674 """Write an .ini-format representation of the configuration state in 

675 git compatible format.""" 

676 

677 def write_section(name: str, section_dict: _OMD) -> None: 

678 fp.write(("[%s]\n" % name).encode(defenc)) 

679 

680 values: Sequence[str] # Runtime only gets str in tests, but should be whatever _OMD stores. 

681 v: str 

682 for key, values in section_dict.items_all(): 

683 if key == "__name__": 

684 continue 

685 

686 for v in values: 

687 fp.write(("\t%s = %s\n" % (key, self._value_to_string(v).replace("\n", "\n\t"))).encode(defenc)) 

688 # END if key is not __name__ 

689 

690 # END section writing 

691 

692 if self._defaults: 

693 write_section(cp.DEFAULTSECT, self._defaults) 

694 value: _OMD 

695 

696 for name, value in self._sections.items(): 

697 write_section(name, value) 

698 

699 def items(self, section_name: str) -> List[Tuple[str, str]]: # type: ignore[override] 

700 """:return: list((option, value), ...) pairs of all items in the given section""" 

701 return [(k, v) for k, v in super().items(section_name) if k != "__name__"] 

702 

703 def items_all(self, section_name: str) -> List[Tuple[str, List[str]]]: 

704 """:return: list((option, [values...]), ...) pairs of all items in the given section""" 

705 rv = _OMD(self._defaults) 

706 

707 for k, vs in self._sections[section_name].items_all(): 

708 if k == "__name__": 

709 continue 

710 

711 if k in rv and rv.getall(k) == vs: 

712 continue 

713 

714 for v in vs: 

715 rv.add(k, v) 

716 

717 return rv.items_all() 

718 

719 @needs_values 

720 def write(self) -> None: 

721 """Write changes to our file, if there are changes at all. 

722 

723 :raise IOError: 

724 If this is a read-only writer instance or if we could not obtain a file 

725 lock. 

726 """ 

727 self._assure_writable("write") 

728 if not self._dirty: 

729 return 

730 

731 if isinstance(self._file_or_files, (list, tuple)): 

732 raise AssertionError( 

733 "Cannot write back if there is not exactly a single file to write to, have %i files" 

734 % len(self._file_or_files) 

735 ) 

736 # END assert multiple files 

737 

738 if self._has_includes(): 

739 _logger.debug( 

740 "Skipping write-back of configuration file as include files were merged in." 

741 + "Set merge_includes=False to prevent this." 

742 ) 

743 return 

744 # END stop if we have include files 

745 

746 fp = self._file_or_files 

747 

748 # We have a physical file on disk, so get a lock. 

749 is_file_lock = isinstance(fp, (str, os.PathLike, IOBase)) # TODO: Use PathLike (having dropped 3.5). 

750 if is_file_lock and self._lock is not None: # Else raise error? 

751 self._lock._obtain_lock() 

752 

753 if not hasattr(fp, "seek"): 

754 fp = cast(PathLike, fp) 

755 with open(fp, "wb") as fp_open: 

756 self._write(fp_open) 

757 else: 

758 fp = cast("BytesIO", fp) 

759 fp.seek(0) 

760 # Make sure we do not overwrite into an existing file. 

761 if hasattr(fp, "truncate"): 

762 fp.truncate() 

763 self._write(fp) 

764 

765 def _assure_writable(self, method_name: str) -> None: 

766 if self.read_only: 

767 raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name)) 

768 

769 def add_section(self, section: "cp._SectionName") -> None: 

770 """Assures added options will stay in order.""" 

771 return super().add_section(section) 

772 

773 @property 

774 def read_only(self) -> bool: 

775 """:return: ``True`` if this instance may change the configuration file""" 

776 return self._read_only 

777 

778 # FIXME: Figure out if default or return type can really include bool. 

779 def get_value( 

780 self, 

781 section: str, 

782 option: str, 

783 default: Union[int, float, str, bool, None] = None, 

784 ) -> Union[int, float, str, bool]: 

785 """Get an option's value. 

786 

787 If multiple values are specified for this option in the section, the last one 

788 specified is returned. 

789 

790 :param default: 

791 If not ``None``, the given default value will be returned in case the option 

792 did not exist. 

793 

794 :return: 

795 A properly typed value, either int, float or string 

796 

797 :raise TypeError: 

798 In case the value could not be understood. 

799 Otherwise the exceptions known to the ConfigParser will be raised. 

800 """ 

801 try: 

802 valuestr = self.get(section, option) 

803 except Exception: 

804 if default is not None: 

805 return default 

806 raise 

807 

808 return self._string_to_value(valuestr) 

809 

810 def get_values( 

811 self, 

812 section: str, 

813 option: str, 

814 default: Union[int, float, str, bool, None] = None, 

815 ) -> List[Union[int, float, str, bool]]: 

816 """Get an option's values. 

817 

818 If multiple values are specified for this option in the section, all are 

819 returned. 

820 

821 :param default: 

822 If not ``None``, a list containing the given default value will be returned 

823 in case the option did not exist. 

824 

825 :return: 

826 A list of properly typed values, either int, float or string 

827 

828 :raise TypeError: 

829 In case the value could not be understood. 

830 Otherwise the exceptions known to the ConfigParser will be raised. 

831 """ 

832 try: 

833 self.sections() 

834 lst = self._sections[section].getall(option) 

835 except Exception: 

836 if default is not None: 

837 return [default] 

838 raise 

839 

840 return [self._string_to_value(valuestr) for valuestr in lst] 

841 

842 def _string_to_value(self, valuestr: str) -> Union[int, float, str, bool]: 

843 types = (int, float) 

844 for numtype in types: 

845 try: 

846 val = numtype(valuestr) 

847 # truncated value ? 

848 if val != float(valuestr): 

849 continue 

850 return val 

851 except (ValueError, TypeError): 

852 continue 

853 # END for each numeric type 

854 

855 # Try boolean values as git uses them. 

856 vl = valuestr.lower() 

857 if vl == "false": 

858 return False 

859 if vl == "true": 

860 return True 

861 

862 if not isinstance(valuestr, str): 

863 raise TypeError( 

864 "Invalid value type: only int, long, float and str are allowed", 

865 valuestr, 

866 ) 

867 

868 return valuestr 

869 

870 def _value_to_string(self, value: Union[str, bytes, int, float, bool]) -> str: 

871 if isinstance(value, (int, float, bool)): 

872 return str(value) 

873 return force_text(value) 

874 

875 @needs_values 

876 @set_dirty_and_flush_changes 

877 def set_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser": 

878 """Set the given option in section to the given value. 

879 

880 This will create the section if required, and will not throw as opposed to the 

881 default ConfigParser ``set`` method. 

882 

883 :param section: 

884 Name of the section in which the option resides or should reside. 

885 

886 :param option: 

887 Name of the options whose value to set. 

888 

889 :param value: 

890 Value to set the option to. It must be a string or convertible to a string. 

891 

892 :return: 

893 This instance 

894 """ 

895 if not self.has_section(section): 

896 self.add_section(section) 

897 self.set(section, option, self._value_to_string(value)) 

898 return self 

899 

900 @needs_values 

901 @set_dirty_and_flush_changes 

902 def add_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser": 

903 """Add a value for the given option in section. 

904 

905 This will create the section if required, and will not throw as opposed to the 

906 default ConfigParser ``set`` method. The value becomes the new value of the 

907 option as returned by :meth:`get_value`, and appends to the list of values 

908 returned by :meth:`get_values`. 

909 

910 :param section: 

911 Name of the section in which the option resides or should reside. 

912 

913 :param option: 

914 Name of the option. 

915 

916 :param value: 

917 Value to add to option. It must be a string or convertible to a string. 

918 

919 :return: 

920 This instance 

921 """ 

922 if not self.has_section(section): 

923 self.add_section(section) 

924 self._sections[section].add(option, self._value_to_string(value)) 

925 return self 

926 

927 def rename_section(self, section: str, new_name: str) -> "GitConfigParser": 

928 """Rename the given section to `new_name`. 

929 

930 :raise ValueError: 

931 If: 

932 

933 * `section` doesn't exist. 

934 * A section with `new_name` does already exist. 

935 

936 :return: 

937 This instance 

938 """ 

939 if not self.has_section(section): 

940 raise ValueError("Source section '%s' doesn't exist" % section) 

941 if self.has_section(new_name): 

942 raise ValueError("Destination section '%s' already exists" % new_name) 

943 

944 super().add_section(new_name) 

945 new_section = self._sections[new_name] 

946 for k, vs in self.items_all(section): 

947 new_section.setall(k, vs) 

948 # END for each value to copy 

949 

950 # This call writes back the changes, which is why we don't have the respective 

951 # decorator. 

952 self.remove_section(section) 

953 return self