Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/config.py: 71%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

477 statements  

1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

2# 

3# This module is part of GitPython and is released under the 

4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6"""Parser for reading and writing configuration files.""" 

7 

8__all__ = ["GitConfigParser", "SectionConstraint"] 

9 

10import abc 

11import configparser as cp 

12import fnmatch 

13from functools import wraps 

14import inspect 

15from io import BufferedReader, IOBase 

16import logging 

17import os 

18import os.path as osp 

19import re 

20import sys 

21 

22from git.compat import defenc, force_text 

23from git.util import LockFile 

24 

25# typing------------------------------------------------------- 

26 

27from typing import ( 

28 Any, 

29 Callable, 

30 Generic, 

31 IO, 

32 List, 

33 Dict, 

34 Sequence, 

35 TYPE_CHECKING, 

36 Tuple, 

37 TypeVar, 

38 Union, 

39 cast, 

40) 

41 

42from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, assert_never, _T 

43 

44if TYPE_CHECKING: 

45 from io import BytesIO 

46 

47 from git.repo.base import Repo 

48 

49T_ConfigParser = TypeVar("T_ConfigParser", bound="GitConfigParser") 

50T_OMD_value = TypeVar("T_OMD_value", str, bytes, int, float, bool) 

51 

52if sys.version_info[:3] < (3, 7, 2): 

53 # typing.Ordereddict not added until Python 3.7.2. 

54 from collections import OrderedDict 

55 

56 OrderedDict_OMD = OrderedDict 

57else: 

58 from typing import OrderedDict 

59 

60 OrderedDict_OMD = OrderedDict[str, List[T_OMD_value]] # type: ignore[assignment, misc] 

61 

62# ------------------------------------------------------------- 

63 

64_logger = logging.getLogger(__name__) 

65 

66CONFIG_LEVELS: ConfigLevels_Tup = ("system", "user", "global", "repository") 

67"""The configuration level of a configuration file.""" 

68 

69CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbranch|hasconfig:remote\.\*\.url):(.+)\"") 

70"""Section pattern to detect conditional includes. 

71 

72See: https://git-scm.com/docs/git-config#_conditional_includes 

73""" 

74 

75UNSAFE_CONFIG_CHARS_RE = re.compile(r"[\r\n\x00]") 

76"""Characters that cannot be safely written in config names or values.""" 

77 

78 

79class MetaParserBuilder(abc.ABCMeta): # noqa: B024 

80 """Utility class wrapping base-class methods into decorators that assure read-only 

81 properties.""" 

82 

83 def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> "MetaParserBuilder": 

84 """Equip all base-class methods with a needs_values decorator, and all non-const 

85 methods with a :func:`set_dirty_and_flush_changes` decorator in addition to 

86 that. 

87 """ 

88 kmm = "_mutating_methods_" 

89 if kmm in clsdict: 

90 mutating_methods = clsdict[kmm] 

91 for base in bases: 

92 methods = (t for t in inspect.getmembers(base, inspect.isroutine) if not t[0].startswith("_")) 

93 for method_name, method in methods: 

94 if method_name in clsdict: 

95 continue 

96 method_with_values = needs_values(method) 

97 if method_name in mutating_methods: 

98 method_with_values = set_dirty_and_flush_changes(method_with_values) 

99 # END mutating methods handling 

100 

101 clsdict[method_name] = method_with_values 

102 # END for each name/method pair 

103 # END for each base 

104 # END if mutating methods configuration is set 

105 

106 new_type = super().__new__(cls, name, bases, clsdict) 

107 return new_type 

108 

109 

110def needs_values(func: Callable[..., _T]) -> Callable[..., _T]: 

111 """Return a method for ensuring we read values (on demand) before we try to access 

112 them.""" 

113 

114 @wraps(func) 

115 def assure_data_present(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: 

116 self.read() 

117 return func(self, *args, **kwargs) 

118 

119 # END wrapper method 

120 return assure_data_present 

121 

122 

123def set_dirty_and_flush_changes(non_const_func: Callable[..., _T]) -> Callable[..., _T]: 

124 """Return a method that checks whether given non constant function may be called. 

125 

126 If so, the instance will be set dirty. Additionally, we flush the changes right to 

127 disk. 

128 """ 

129 

130 def flush_changes(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: 

131 rval = non_const_func(self, *args, **kwargs) 

132 self._dirty = True 

133 self.write() 

134 return rval 

135 

136 # END wrapper method 

137 flush_changes.__name__ = non_const_func.__name__ 

138 return flush_changes 

139 

140 

141class SectionConstraint(Generic[T_ConfigParser]): 

142 """Constrains a ConfigParser to only option commands which are constrained to 

143 always use the section we have been initialized with. 

144 

145 It supports all ConfigParser methods that operate on an option. 

146 

147 :note: 

148 If used as a context manager, will release the wrapped ConfigParser. 

149 """ 

150 

151 __slots__ = ("_config", "_section_name") 

152 

153 _valid_attrs_ = ( 

154 "get_value", 

155 "set_value", 

156 "get", 

157 "set", 

158 "getint", 

159 "getfloat", 

160 "getboolean", 

161 "has_option", 

162 "remove_section", 

163 "remove_option", 

164 "options", 

165 ) 

166 

167 def __init__(self, config: T_ConfigParser, section: str) -> None: 

168 self._config = config 

169 self._section_name = section 

170 

171 def __del__(self) -> None: 

172 # Yes, for some reason, we have to call it explicitly for it to work in PY3 ! 

173 # Apparently __del__ doesn't get call anymore if refcount becomes 0 

174 # Ridiculous ... . 

175 self._config.release() 

176 

177 def __getattr__(self, attr: str) -> Any: 

178 if attr in self._valid_attrs_: 

179 return lambda *args, **kwargs: self._call_config(attr, *args, **kwargs) 

180 return super().__getattribute__(attr) 

181 

182 def _call_config(self, method: str, *args: Any, **kwargs: Any) -> Any: 

183 """Call the configuration at the given method which must take a section name as 

184 first argument.""" 

185 return getattr(self._config, method)(self._section_name, *args, **kwargs) 

186 

187 @property 

188 def config(self) -> T_ConfigParser: 

189 """return: ConfigParser instance we constrain""" 

190 return self._config 

191 

192 def release(self) -> None: 

193 """Equivalent to :meth:`GitConfigParser.release`, which is called on our 

194 underlying parser instance.""" 

195 return self._config.release() 

196 

197 def __enter__(self) -> "SectionConstraint[T_ConfigParser]": 

198 self._config.__enter__() 

199 return self 

200 

201 def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None: 

202 self._config.__exit__(exception_type, exception_value, traceback) 

203 

204 

205class _OMD(OrderedDict_OMD): 

206 """Ordered multi-dict.""" 

207 

208 def __setitem__(self, key: str, value: _T) -> None: 

209 super().__setitem__(key, [value]) 

210 

211 def add(self, key: str, value: Any) -> None: 

212 if key not in self: 

213 super().__setitem__(key, [value]) 

214 return 

215 

216 super().__getitem__(key).append(value) 

217 

218 def setall(self, key: str, values: List[_T]) -> None: 

219 super().__setitem__(key, values) 

220 

221 def __getitem__(self, key: str) -> Any: 

222 return super().__getitem__(key)[-1] 

223 

224 def getlast(self, key: str) -> Any: 

225 return super().__getitem__(key)[-1] 

226 

227 def setlast(self, key: str, value: Any) -> None: 

228 if key not in self: 

229 super().__setitem__(key, [value]) 

230 return 

231 

232 prior = super().__getitem__(key) 

233 prior[-1] = value 

234 

235 def get(self, key: str, default: Union[_T, None] = None) -> Union[_T, None]: 

236 return super().get(key, [default])[-1] 

237 

238 def getall(self, key: str) -> List[_T]: 

239 return super().__getitem__(key) 

240 

241 def items(self) -> List[Tuple[str, _T]]: # type: ignore[override] 

242 """List of (key, last value for key).""" 

243 return [(k, self[k]) for k in self] 

244 

245 def items_all(self) -> List[Tuple[str, List[_T]]]: 

246 """List of (key, list of values for key).""" 

247 return [(k, self.getall(k)) for k in self] 

248 

249 

250def get_config_path(config_level: Lit_config_levels) -> str: 

251 # We do not support an absolute path of the gitconfig on Windows. 

252 # Use the global config instead. 

253 if sys.platform == "win32" and config_level == "system": 

254 config_level = "global" 

255 

256 if config_level == "system": 

257 return "/etc/gitconfig" 

258 elif config_level == "user": 

259 config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config") 

260 return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config"))) 

261 elif config_level == "global": 

262 return osp.normpath(osp.expanduser("~/.gitconfig")) 

263 elif config_level == "repository": 

264 raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path") 

265 else: 

266 # Should not reach here. Will raise ValueError if does. Static typing will warn 

267 # about missing elifs. 

268 assert_never( # type: ignore[unreachable] 

269 config_level, 

270 ValueError(f"Invalid configuration level: {config_level!r}"), 

271 ) 

272 

273 

274class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): 

275 """Implements specifics required to read git style configuration files. 

276 

277 This variation behaves much like the :manpage:`git-config(1)` command, such that the 

278 configuration will be read on demand based on the filepath given during 

279 initialization. 

280 

281 The changes will automatically be written once the instance goes out of scope, but 

282 can be triggered manually as well. 

283 

284 The configuration file will be locked if you intend to change values preventing 

285 other instances to write concurrently. 

286 

287 :note: 

288 The config is case-sensitive even when queried, hence section and option names 

289 must match perfectly. 

290 

291 :note: 

292 If used as a context manager, this will release the locked file. 

293 """ 

294 

295 # { Configuration 

296 t_lock = LockFile 

297 """The lock type determines the type of lock to use in new configuration readers. 

298 

299 They must be compatible to the :class:`~git.util.LockFile` interface. 

300 A suitable alternative would be the :class:`~git.util.BlockingLockFile`. 

301 """ 

302 

303 re_comment = re.compile(r"^\s*[#;]") 

304 # } END configuration 

305 

306 optvalueonly_source = r"\s*(?P<option>[^:=\s][^:=]*)" 

307 

308 OPTVALUEONLY = re.compile(optvalueonly_source) 

309 

310 OPTCRE = re.compile(optvalueonly_source + r"\s*(?P<vi>[:=])\s*" + r"(?P<value>.*)$") 

311 

312 del optvalueonly_source 

313 

314 _mutating_methods_ = ("add_section", "remove_section", "remove_option", "set") 

315 """Names of :class:`~configparser.RawConfigParser` methods able to change the 

316 instance.""" 

317 

318 def __init__( 

319 self, 

320 file_or_files: Union[None, PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = None, 

321 read_only: bool = True, 

322 merge_includes: bool = True, 

323 config_level: Union[Lit_config_levels, None] = None, 

324 repo: Union["Repo", None] = None, 

325 ) -> None: 

326 """Initialize a configuration reader to read the given `file_or_files` and to 

327 possibly allow changes to it by setting `read_only` False. 

328 

329 :param file_or_files: 

330 A file path or file object, or a sequence of possibly more than one of them. 

331 

332 :param read_only: 

333 If ``True``, the ConfigParser may only read the data, but not change it. 

334 If ``False``, only a single file path or file object may be given. We will 

335 write back the changes when they happen, or when the ConfigParser is 

336 released. This will not happen if other configuration files have been 

337 included. 

338 

339 :param merge_includes: 

340 If ``True``, we will read files mentioned in ``[include]`` sections and 

341 merge their contents into ours. This makes it impossible to write back an 

342 individual configuration file. Thus, if you want to modify a single 

343 configuration file, turn this off to leave the original dataset unaltered 

344 when reading it. 

345 

346 :param repo: 

347 Reference to repository to use if ``[includeIf]`` sections are found in 

348 configuration files. 

349 """ 

350 cp.RawConfigParser.__init__(self, dict_type=_OMD) 

351 self._dict: Callable[..., _OMD] 

352 self._defaults: _OMD 

353 self._sections: _OMD 

354 

355 # Used in Python 3. Needs to stay in sync with sections for underlying 

356 # implementation to work. 

357 if not hasattr(self, "_proxies"): 

358 self._proxies = self._dict() 

359 

360 if file_or_files is not None: 

361 self._file_or_files: Union[PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = file_or_files 

362 else: 

363 if config_level is None: 

364 if read_only: 

365 self._file_or_files = [ 

366 get_config_path(cast(Lit_config_levels, f)) for f in CONFIG_LEVELS if f != "repository" 

367 ] 

368 else: 

369 raise ValueError("No configuration level or configuration files specified") 

370 else: 

371 self._file_or_files = [get_config_path(config_level)] 

372 

373 self._read_only = read_only 

374 self._dirty = False 

375 self._is_initialized = False 

376 self._merge_includes = merge_includes 

377 self._repo = repo 

378 self._lock: Union["LockFile", None] = None 

379 self._acquire_lock() 

380 

381 def _acquire_lock(self) -> None: 

382 if not self._read_only: 

383 if not self._lock: 

384 if isinstance(self._file_or_files, (str, os.PathLike)): 

385 file_or_files = self._file_or_files 

386 elif isinstance(self._file_or_files, (tuple, list, Sequence)): 

387 raise ValueError( 

388 "Write-ConfigParsers can operate on a single file only, multiple files have been passed" 

389 ) 

390 else: 

391 file_or_files = self._file_or_files.name 

392 

393 # END get filename from handle/stream 

394 # Initialize lock base - we want to write. 

395 self._lock = self.t_lock(file_or_files) 

396 # END lock check 

397 

398 self._lock._obtain_lock() 

399 # END read-only check 

400 

401 def __del__(self) -> None: 

402 """Write pending changes if required and release locks.""" 

403 # NOTE: Only consistent in Python 2. 

404 self.release() 

405 

406 def __enter__(self) -> "GitConfigParser": 

407 self._acquire_lock() 

408 return self 

409 

410 def __exit__(self, *args: Any) -> None: 

411 self.release() 

412 

413 def release(self) -> None: 

414 """Flush changes and release the configuration write lock. This instance must 

415 not be used anymore afterwards. 

416 

417 In Python 3, it's required to explicitly release locks and flush changes, as 

418 ``__del__`` is not called deterministically anymore. 

419 """ 

420 # Checking for the lock here makes sure we do not raise during write() 

421 # in case an invalid parser was created who could not get a lock. 

422 if self.read_only or (self._lock and not self._lock._has_lock()): 

423 return 

424 

425 try: 

426 self.write() 

427 except IOError: 

428 _logger.error("Exception during destruction of GitConfigParser", exc_info=True) 

429 except ReferenceError: 

430 # This happens in Python 3... and usually means that some state cannot be 

431 # written as the sections dict cannot be iterated. This usually happens when 

432 # the interpreter is shutting down. Can it be fixed? 

433 pass 

434 finally: 

435 if self._lock is not None: 

436 self._lock._release_lock() 

437 

438 def optionxform(self, optionstr: str) -> str: 

439 """Do not transform options in any way when writing.""" 

440 return optionstr 

441 

442 def _read(self, fp: Union[BufferedReader, IO[bytes]], fpname: str) -> None: 

443 """Originally a direct copy of the Python 2.4 version of 

444 :meth:`RawConfigParser._read <configparser.RawConfigParser._read>`, to ensure it 

445 uses ordered dicts. 

446 

447 The ordering bug was fixed in Python 2.4, and dict itself keeps ordering since 

448 Python 3.7. This has some other changes, especially that it ignores initial 

449 whitespace, since git uses tabs. (Big comments are removed to be more compact.) 

450 """ 

451 cursect = None # None, or a dictionary. 

452 optname = None 

453 lineno = 0 

454 is_multi_line = False 

455 e = None # None, or an exception. 

456 

457 def string_decode(v: str) -> str: 

458 if v and v.endswith("\\"): 

459 v = v[:-1] 

460 # END cut trailing escapes to prevent decode error 

461 

462 return v.encode(defenc).decode("unicode_escape") 

463 

464 # END string_decode 

465 

466 while True: 

467 # We assume to read binary! 

468 line = fp.readline().decode(defenc) 

469 if not line: 

470 break 

471 lineno = lineno + 1 

472 # Comment or blank line? 

473 if line.strip() == "" or self.re_comment.match(line): 

474 continue 

475 if line.split(None, 1)[0].lower() == "rem" and line[0] in "rR": 

476 # No leading whitespace. 

477 continue 

478 

479 # Is it a section header? 

480 mo = self.SECTCRE.match(line.strip()) 

481 if not is_multi_line and mo: 

482 sectname: str = mo.group("header").strip() 

483 if sectname in self._sections: 

484 cursect = self._sections[sectname] 

485 elif sectname == cp.DEFAULTSECT: 

486 cursect = self._defaults 

487 else: 

488 cursect = self._dict((("__name__", sectname),)) 

489 self._sections[sectname] = cursect 

490 self._proxies[sectname] = None 

491 # So sections can't start with a continuation line. 

492 optname = None 

493 # No section header in the file? 

494 elif cursect is None: 

495 raise cp.MissingSectionHeaderError(fpname, lineno, line) 

496 # An option line? 

497 elif not is_multi_line: 

498 mo = self.OPTCRE.match(line) 

499 if mo: 

500 # We might just have handled the last line, which could contain a quotation we want to remove. 

501 optname, vi, optval = mo.group("option", "vi", "value") 

502 optname = self.optionxform(optname.rstrip()) 

503 

504 if vi in ("=", ":") and ";" in optval and not optval.strip().startswith('"'): 

505 pos = optval.find(";") 

506 if pos != -1 and optval[pos - 1].isspace(): 

507 optval = optval[:pos] 

508 optval = optval.strip() 

509 

510 if len(optval) < 2 or optval[0] != '"': 

511 # Does not open quoting. 

512 pass 

513 elif optval[-1] != '"': 

514 # Opens quoting and does not close: appears to start multi-line quoting. 

515 is_multi_line = True 

516 optval = string_decode(optval[1:]) 

517 elif optval.find("\\", 1, -1) == -1 and optval.find('"', 1, -1) == -1: 

518 # Opens and closes quoting. Single line, and all we need is quote removal. 

519 optval = optval[1:-1] 

520 # TODO: Handle other quoted content, especially well-formed backslash escapes. 

521 

522 # Preserves multiple values for duplicate optnames. 

523 cursect.add(optname, optval) 

524 else: 

525 # Check if it's an option with no value - it's just ignored by git. 

526 if not self.OPTVALUEONLY.match(line): 

527 if not e: 

528 e = cp.ParsingError(fpname) 

529 e.append(lineno, repr(line)) 

530 continue 

531 else: 

532 line = line.rstrip() 

533 if line.endswith('"'): 

534 is_multi_line = False 

535 line = line[:-1] 

536 # END handle quotations 

537 optval = cursect.getlast(optname) 

538 cursect.setlast(optname, optval + string_decode(line)) 

539 # END parse section or option 

540 # END while reading 

541 

542 # If any parsing errors occurred, raise an exception. 

543 if e: 

544 raise e 

545 

546 def _has_includes(self) -> Union[bool, int]: 

547 return self._merge_includes and len(self._included_paths()) 

548 

549 def _included_paths(self) -> List[Tuple[str, str]]: 

550 """List all paths that must be included to configuration. 

551 

552 :return: 

553 The list of paths, where each path is a tuple of (option, value). 

554 """ 

555 

556 def _all_items(section: str) -> List[Tuple[str, str]]: 

557 """Return all (key, value) pairs for a section, including duplicate keys.""" 

558 return [ 

559 (key, value) 

560 for key, values in self._sections[section].items_all() 

561 if key != "__name__" 

562 for value in values 

563 ] 

564 

565 paths = [] 

566 

567 for section in self.sections(): 

568 if section == "include": 

569 paths += _all_items(section) 

570 

571 match = CONDITIONAL_INCLUDE_REGEXP.search(section) 

572 if match is None or self._repo is None: 

573 continue 

574 

575 keyword = match.group(1) 

576 value = match.group(2).strip() 

577 

578 if keyword in ["gitdir", "gitdir/i"]: 

579 value = osp.expanduser(value) 

580 

581 if not any(value.startswith(s) for s in ["./", "/"]): 

582 value = "**/" + value 

583 if value.endswith("/"): 

584 value += "**" 

585 

586 # Ensure that glob is always case insensitive if required. 

587 if keyword.endswith("/i"): 

588 value = re.sub( 

589 r"[a-zA-Z]", 

590 lambda m: f"[{m.group().lower()!r}{m.group().upper()!r}]", 

591 value, 

592 ) 

593 if self._repo.git_dir: 

594 if fnmatch.fnmatchcase(os.fspath(self._repo.git_dir), value): 

595 paths += _all_items(section) 

596 

597 elif keyword == "onbranch": 

598 try: 

599 branch_name = self._repo.active_branch.name 

600 except TypeError: 

601 # Ignore section if active branch cannot be retrieved. 

602 continue 

603 

604 if fnmatch.fnmatchcase(branch_name, value): 

605 paths += _all_items(section) 

606 elif keyword == "hasconfig:remote.*.url": 

607 for remote in self._repo.remotes: 

608 if fnmatch.fnmatchcase(remote.url, value): 

609 paths += _all_items(section) 

610 break 

611 return paths 

612 

613 def read(self) -> None: # type: ignore[override] 

614 """Read the data stored in the files we have been initialized with. 

615 

616 This will ignore files that cannot be read, possibly leaving an empty 

617 configuration. 

618 

619 :raise IOError: 

620 If a file cannot be handled. 

621 """ 

622 if self._is_initialized: 

623 return 

624 self._is_initialized = True 

625 

626 files_to_read: List[Union[PathLike, IO]] = [""] 

627 if isinstance(self._file_or_files, (str, os.PathLike)): 

628 # For str or Path, as str is a type of Sequence. 

629 files_to_read = [self._file_or_files] 

630 elif not isinstance(self._file_or_files, (tuple, list, Sequence)): 

631 # Could merge with above isinstance once runtime type known. 

632 files_to_read = [self._file_or_files] 

633 else: # For lists or tuples. 

634 files_to_read = list(self._file_or_files) 

635 # END ensure we have a copy of the paths to handle 

636 

637 seen = set(files_to_read) 

638 num_read_include_files = 0 

639 while files_to_read: 

640 file_path = files_to_read.pop(0) 

641 file_ok = False 

642 

643 if hasattr(file_path, "seek"): 

644 # Must be a file-object. 

645 # TODO: Replace cast with assert to narrow type, once sure. 

646 file_path = cast(IO[bytes], file_path) 

647 self._read(file_path, file_path.name) 

648 else: 

649 try: 

650 with open(file_path, "rb") as fp: 

651 file_ok = True 

652 self._read(fp, fp.name) 

653 except IOError: 

654 continue 

655 

656 # Read includes and append those that we didn't handle yet. We expect all 

657 # paths to be normalized and absolute (and will ensure that is the case). 

658 if self._has_includes(): 

659 for _, include_path in self._included_paths(): 

660 if include_path.startswith("~"): 

661 include_path = osp.expanduser(include_path) 

662 if not osp.isabs(include_path): 

663 if not file_ok: 

664 continue 

665 # END ignore relative paths if we don't know the configuration file path 

666 file_path = cast(PathLike, file_path) 

667 assert osp.isabs(file_path), "Need absolute paths to be sure our cycle checks will work" 

668 include_path = osp.join(osp.dirname(file_path), include_path) 

669 # END make include path absolute 

670 include_path = osp.normpath(include_path) 

671 if include_path in seen or not os.access(include_path, os.R_OK): 

672 continue 

673 seen.add(include_path) 

674 # Insert included file to the top to be considered first. 

675 files_to_read.insert(0, include_path) 

676 num_read_include_files += 1 

677 # END each include path in configuration file 

678 # END handle includes 

679 # END for each file object to read 

680 

681 # If there was no file included, we can safely write back (potentially) the 

682 # configuration file without altering its meaning. 

683 if num_read_include_files == 0: 

684 self._merge_includes = False 

685 

686 def _write(self, fp: IO) -> None: 

687 """Write an .ini-format representation of the configuration state in 

688 git compatible format.""" 

689 

690 def write_section(name: str, section_dict: _OMD) -> None: 

691 fp.write(("[%s]\n" % name).encode(defenc)) 

692 

693 values: Sequence[str] # Runtime only gets str in tests, but should be whatever _OMD stores. 

694 v: str 

695 for key, values in section_dict.items_all(): 

696 if key == "__name__": 

697 continue 

698 

699 for v in values: 

700 fp.write(("\t%s = %s\n" % (key, self._value_to_string(v).replace("\n", "\n\t"))).encode(defenc)) 

701 # END if key is not __name__ 

702 

703 # END section writing 

704 

705 if self._defaults: 

706 write_section(cp.DEFAULTSECT, self._defaults) 

707 value: _OMD 

708 

709 for name, value in self._sections.items(): 

710 write_section(name, value) 

711 

712 def items(self, section_name: str) -> List[Tuple[str, str]]: # type: ignore[override] 

713 """:return: list((option, value), ...) pairs of all items in the given section""" 

714 return [(k, v) for k, v in super().items(section_name) if k != "__name__"] 

715 

716 def items_all(self, section_name: str) -> List[Tuple[str, List[str]]]: 

717 """:return: list((option, [values...]), ...) pairs of all items in the given section""" 

718 rv = _OMD(self._defaults) 

719 

720 for k, vs in self._sections[section_name].items_all(): 

721 if k == "__name__": 

722 continue 

723 

724 if k in rv and rv.getall(k) == vs: 

725 continue 

726 

727 for v in vs: 

728 rv.add(k, v) 

729 

730 return rv.items_all() 

731 

732 @needs_values 

733 def write(self) -> None: 

734 """Write changes to our file, if there are changes at all. 

735 

736 :raise IOError: 

737 If this is a read-only writer instance or if we could not obtain a file 

738 lock. 

739 """ 

740 self._assure_writable("write") 

741 if not self._dirty: 

742 return 

743 

744 if isinstance(self._file_or_files, (list, tuple)): 

745 raise AssertionError( 

746 "Cannot write back if there is not exactly a single file to write to, have %i files" 

747 % len(self._file_or_files) 

748 ) 

749 # END assert multiple files 

750 

751 if self._has_includes(): 

752 _logger.debug( 

753 "Skipping write-back of configuration file as include files were merged in." 

754 + "Set merge_includes=False to prevent this." 

755 ) 

756 return 

757 # END stop if we have include files 

758 

759 fp = self._file_or_files 

760 

761 # We have a physical file on disk, so get a lock. 

762 is_file_lock = isinstance(fp, (str, os.PathLike, IOBase)) # TODO: Use PathLike (having dropped 3.5). 

763 if is_file_lock and self._lock is not None: # Else raise error? 

764 self._lock._obtain_lock() 

765 

766 if not hasattr(fp, "seek"): 

767 fp = cast(PathLike, fp) 

768 with open(fp, "wb") as fp_open: 

769 self._write(fp_open) 

770 else: 

771 fp = cast("BytesIO", fp) 

772 fp.seek(0) 

773 # Make sure we do not overwrite into an existing file. 

774 if hasattr(fp, "truncate"): 

775 fp.truncate() 

776 self._write(fp) 

777 

778 def _assure_writable(self, method_name: str) -> None: 

779 if self.read_only: 

780 raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name)) 

781 

782 def add_section(self, section: "cp._SectionName") -> None: 

783 """Assures added options will stay in order.""" 

784 self._assure_config_name_safe(section, "section") 

785 return super().add_section(section) 

786 

787 @property 

788 def read_only(self) -> bool: 

789 """:return: ``True`` if this instance may change the configuration file""" 

790 return self._read_only 

791 

792 # FIXME: Figure out if default or return type can really include bool. 

793 def get_value( 

794 self, 

795 section: str, 

796 option: str, 

797 default: Union[int, float, str, bool, None] = None, 

798 ) -> Union[int, float, str, bool]: 

799 """Get an option's value. 

800 

801 If multiple values are specified for this option in the section, the last one 

802 specified is returned. 

803 

804 :param default: 

805 If not ``None``, the given default value will be returned in case the option 

806 did not exist. 

807 

808 :return: 

809 A properly typed value, either int, float or string 

810 

811 :raise TypeError: 

812 In case the value could not be understood. 

813 Otherwise the exceptions known to the ConfigParser will be raised. 

814 """ 

815 try: 

816 valuestr = self.get(section, option) 

817 except Exception: 

818 if default is not None: 

819 return default 

820 raise 

821 

822 return self._string_to_value(valuestr) 

823 

824 def get_values( 

825 self, 

826 section: str, 

827 option: str, 

828 default: Union[int, float, str, bool, None] = None, 

829 ) -> List[Union[int, float, str, bool]]: 

830 """Get an option's values. 

831 

832 If multiple values are specified for this option in the section, all are 

833 returned. 

834 

835 :param default: 

836 If not ``None``, a list containing the given default value will be returned 

837 in case the option did not exist. 

838 

839 :return: 

840 A list of properly typed values, either int, float or string 

841 

842 :raise TypeError: 

843 In case the value could not be understood. 

844 Otherwise the exceptions known to the ConfigParser will be raised. 

845 """ 

846 try: 

847 self.sections() 

848 lst = self._sections[section].getall(option) 

849 except Exception: 

850 if default is not None: 

851 return [default] 

852 raise 

853 

854 return [self._string_to_value(valuestr) for valuestr in lst] 

855 

856 def _string_to_value(self, valuestr: str) -> Union[int, float, str, bool]: 

857 types = (int, float) 

858 for numtype in types: 

859 try: 

860 val = numtype(valuestr) 

861 # truncated value ? 

862 if val != float(valuestr): 

863 continue 

864 return val 

865 except (ValueError, TypeError): 

866 continue 

867 # END for each numeric type 

868 

869 # Try boolean values as git uses them. 

870 vl = valuestr.lower() 

871 if vl == "false": 

872 return False 

873 if vl == "true": 

874 return True 

875 

876 if not isinstance(valuestr, str): 

877 raise TypeError( 

878 "Invalid value type: only int, long, float and str are allowed", 

879 valuestr, 

880 ) 

881 

882 return valuestr 

883 

884 def _value_to_string(self, value: Union[str, bytes, int, float, bool]) -> str: 

885 if isinstance(value, (int, float, bool)): 

886 return str(value) 

887 return force_text(value) 

888 

889 def _value_to_string_safe(self, value: Union[str, bytes, int, float, bool]) -> str: 

890 value_str = self._value_to_string(value) 

891 if UNSAFE_CONFIG_CHARS_RE.search(value_str): 

892 raise ValueError("Git config values must not contain CR, LF, or NUL") 

893 return value_str 

894 

895 def _assure_config_name_safe(self, name: "cp._SectionName", label: str) -> None: 

896 if isinstance(name, str) and UNSAFE_CONFIG_CHARS_RE.search(name): 

897 raise ValueError("Git config %s names must not contain CR, LF, or NUL" % label) 

898 

899 @needs_values 

900 @set_dirty_and_flush_changes 

901 def set( 

902 self, 

903 section: str, 

904 option: str, 

905 value: Union[str, bytes, int, float, bool, None] = None, 

906 ) -> None: 

907 self._assure_config_name_safe(section, "section") 

908 self._assure_config_name_safe(option, "option") 

909 if value is not None: 

910 value = self._value_to_string_safe(value) 

911 return super().set(section, option, value) 

912 

913 @needs_values 

914 @set_dirty_and_flush_changes 

915 def set_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser": 

916 """Set the given option in section to the given value. 

917 

918 This will create the section if required, and will not throw as opposed to the 

919 default ConfigParser ``set`` method. 

920 

921 :param section: 

922 Name of the section in which the option resides or should reside. 

923 

924 :param option: 

925 Name of the options whose value to set. 

926 

927 :param value: 

928 Value to set the option to. It must be a string or convertible to a string. 

929 

930 :return: 

931 This instance 

932 """ 

933 self._assure_config_name_safe(section, "section") 

934 self._assure_config_name_safe(option, "option") 

935 value_str = self._value_to_string_safe(value) 

936 if not self.has_section(section): 

937 self.add_section(section) 

938 super().set(section, option, value_str) 

939 return self 

940 

941 @needs_values 

942 @set_dirty_and_flush_changes 

943 def add_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser": 

944 """Add a value for the given option in section. 

945 

946 This will create the section if required, and will not throw as opposed to the 

947 default ConfigParser ``set`` method. The value becomes the new value of the 

948 option as returned by :meth:`get_value`, and appends to the list of values 

949 returned by :meth:`get_values`. 

950 

951 :param section: 

952 Name of the section in which the option resides or should reside. 

953 

954 :param option: 

955 Name of the option. 

956 

957 :param value: 

958 Value to add to option. It must be a string or convertible to a string. 

959 

960 :return: 

961 This instance 

962 """ 

963 self._assure_config_name_safe(section, "section") 

964 self._assure_config_name_safe(option, "option") 

965 value_str = self._value_to_string_safe(value) 

966 if not self.has_section(section): 

967 self.add_section(section) 

968 self._sections[section].add(option, value_str) 

969 return self 

970 

971 def rename_section(self, section: str, new_name: str) -> "GitConfigParser": 

972 """Rename the given section to `new_name`. 

973 

974 :raise ValueError: 

975 If: 

976 

977 * `section` doesn't exist. 

978 * A section with `new_name` does already exist. 

979 

980 :return: 

981 This instance 

982 """ 

983 if not self.has_section(section): 

984 raise ValueError("Source section '%s' doesn't exist" % section) 

985 self._assure_config_name_safe(new_name, "section") 

986 if self.has_section(new_name): 

987 raise ValueError("Destination section '%s' already exists" % new_name) 

988 

989 super().add_section(new_name) 

990 new_section = self._sections[new_name] 

991 for k, vs in self.items_all(section): 

992 new_section.setall(k, vs) 

993 # END for each value to copy 

994 

995 # This call writes back the changes, which is why we don't have the respective 

996 # decorator. 

997 self.remove_section(section) 

998 return self