Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/config.py: 71%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
2#
3# This module is part of GitPython and is released under the
4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
6"""Parser for reading and writing configuration files."""
8__all__ = ["GitConfigParser", "SectionConstraint"]
10import abc
11import configparser as cp
12import fnmatch
13from functools import wraps
14import inspect
15from io import BufferedReader, IOBase
16import logging
17import os
18import os.path as osp
19import re
20import sys
22from git.compat import defenc, force_text
23from git.util import LockFile
25# typing-------------------------------------------------------
27from typing import (
28 Any,
29 Callable,
30 Generic,
31 IO,
32 List,
33 Dict,
34 Sequence,
35 TYPE_CHECKING,
36 Tuple,
37 TypeVar,
38 Union,
39 cast,
40)
42from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, assert_never, _T
44if TYPE_CHECKING:
45 from io import BytesIO
47 from git.repo.base import Repo
49T_ConfigParser = TypeVar("T_ConfigParser", bound="GitConfigParser")
50T_OMD_value = TypeVar("T_OMD_value", str, bytes, int, float, bool)
52if sys.version_info[:3] < (3, 7, 2):
53 # typing.Ordereddict not added until Python 3.7.2.
54 from collections import OrderedDict
56 OrderedDict_OMD = OrderedDict
57else:
58 from typing import OrderedDict
60 OrderedDict_OMD = OrderedDict[str, List[T_OMD_value]] # type: ignore[assignment, misc]
62# -------------------------------------------------------------
64_logger = logging.getLogger(__name__)
66CONFIG_LEVELS: ConfigLevels_Tup = ("system", "user", "global", "repository")
67"""The configuration level of a configuration file."""
69CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbranch|hasconfig:remote\.\*\.url):(.+)\"")
70"""Section pattern to detect conditional includes.
72See: https://git-scm.com/docs/git-config#_conditional_includes
73"""
75UNSAFE_CONFIG_CHARS_RE = re.compile(r"[\r\n\x00]")
76"""Characters that cannot be safely written in config names or values."""
79class MetaParserBuilder(abc.ABCMeta): # noqa: B024
80 """Utility class wrapping base-class methods into decorators that assure read-only
81 properties."""
83 def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> "MetaParserBuilder":
84 """Equip all base-class methods with a needs_values decorator, and all non-const
85 methods with a :func:`set_dirty_and_flush_changes` decorator in addition to
86 that.
87 """
88 kmm = "_mutating_methods_"
89 if kmm in clsdict:
90 mutating_methods = clsdict[kmm]
91 for base in bases:
92 methods = (t for t in inspect.getmembers(base, inspect.isroutine) if not t[0].startswith("_"))
93 for method_name, method in methods:
94 if method_name in clsdict:
95 continue
96 method_with_values = needs_values(method)
97 if method_name in mutating_methods:
98 method_with_values = set_dirty_and_flush_changes(method_with_values)
99 # END mutating methods handling
101 clsdict[method_name] = method_with_values
102 # END for each name/method pair
103 # END for each base
104 # END if mutating methods configuration is set
106 new_type = super().__new__(cls, name, bases, clsdict)
107 return new_type
110def needs_values(func: Callable[..., _T]) -> Callable[..., _T]:
111 """Return a method for ensuring we read values (on demand) before we try to access
112 them."""
114 @wraps(func)
115 def assure_data_present(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T:
116 self.read()
117 return func(self, *args, **kwargs)
119 # END wrapper method
120 return assure_data_present
123def set_dirty_and_flush_changes(non_const_func: Callable[..., _T]) -> Callable[..., _T]:
124 """Return a method that checks whether given non constant function may be called.
126 If so, the instance will be set dirty. Additionally, we flush the changes right to
127 disk.
128 """
130 def flush_changes(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T:
131 rval = non_const_func(self, *args, **kwargs)
132 self._dirty = True
133 self.write()
134 return rval
136 # END wrapper method
137 flush_changes.__name__ = non_const_func.__name__
138 return flush_changes
141class SectionConstraint(Generic[T_ConfigParser]):
142 """Constrains a ConfigParser to only option commands which are constrained to
143 always use the section we have been initialized with.
145 It supports all ConfigParser methods that operate on an option.
147 :note:
148 If used as a context manager, will release the wrapped ConfigParser.
149 """
151 __slots__ = ("_config", "_section_name")
153 _valid_attrs_ = (
154 "get_value",
155 "set_value",
156 "get",
157 "set",
158 "getint",
159 "getfloat",
160 "getboolean",
161 "has_option",
162 "remove_section",
163 "remove_option",
164 "options",
165 )
167 def __init__(self, config: T_ConfigParser, section: str) -> None:
168 self._config = config
169 self._section_name = section
171 def __del__(self) -> None:
172 # Yes, for some reason, we have to call it explicitly for it to work in PY3 !
173 # Apparently __del__ doesn't get call anymore if refcount becomes 0
174 # Ridiculous ... .
175 self._config.release()
177 def __getattr__(self, attr: str) -> Any:
178 if attr in self._valid_attrs_:
179 return lambda *args, **kwargs: self._call_config(attr, *args, **kwargs)
180 return super().__getattribute__(attr)
182 def _call_config(self, method: str, *args: Any, **kwargs: Any) -> Any:
183 """Call the configuration at the given method which must take a section name as
184 first argument."""
185 return getattr(self._config, method)(self._section_name, *args, **kwargs)
187 @property
188 def config(self) -> T_ConfigParser:
189 """return: ConfigParser instance we constrain"""
190 return self._config
192 def release(self) -> None:
193 """Equivalent to :meth:`GitConfigParser.release`, which is called on our
194 underlying parser instance."""
195 return self._config.release()
197 def __enter__(self) -> "SectionConstraint[T_ConfigParser]":
198 self._config.__enter__()
199 return self
201 def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None:
202 self._config.__exit__(exception_type, exception_value, traceback)
205class _OMD(OrderedDict_OMD):
206 """Ordered multi-dict."""
208 def __setitem__(self, key: str, value: _T) -> None:
209 super().__setitem__(key, [value])
211 def add(self, key: str, value: Any) -> None:
212 if key not in self:
213 super().__setitem__(key, [value])
214 return
216 super().__getitem__(key).append(value)
218 def setall(self, key: str, values: List[_T]) -> None:
219 super().__setitem__(key, values)
221 def __getitem__(self, key: str) -> Any:
222 return super().__getitem__(key)[-1]
224 def getlast(self, key: str) -> Any:
225 return super().__getitem__(key)[-1]
227 def setlast(self, key: str, value: Any) -> None:
228 if key not in self:
229 super().__setitem__(key, [value])
230 return
232 prior = super().__getitem__(key)
233 prior[-1] = value
235 def get(self, key: str, default: Union[_T, None] = None) -> Union[_T, None]:
236 return super().get(key, [default])[-1]
238 def getall(self, key: str) -> List[_T]:
239 return super().__getitem__(key)
241 def items(self) -> List[Tuple[str, _T]]: # type: ignore[override]
242 """List of (key, last value for key)."""
243 return [(k, self[k]) for k in self]
245 def items_all(self) -> List[Tuple[str, List[_T]]]:
246 """List of (key, list of values for key)."""
247 return [(k, self.getall(k)) for k in self]
250def get_config_path(config_level: Lit_config_levels) -> str:
251 # We do not support an absolute path of the gitconfig on Windows.
252 # Use the global config instead.
253 if sys.platform == "win32" and config_level == "system":
254 config_level = "global"
256 if config_level == "system":
257 return "/etc/gitconfig"
258 elif config_level == "user":
259 config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config")
260 return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config")))
261 elif config_level == "global":
262 return osp.normpath(osp.expanduser("~/.gitconfig"))
263 elif config_level == "repository":
264 raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path")
265 else:
266 # Should not reach here. Will raise ValueError if does. Static typing will warn
267 # about missing elifs.
268 assert_never( # type: ignore[unreachable]
269 config_level,
270 ValueError(f"Invalid configuration level: {config_level!r}"),
271 )
274class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder):
275 """Implements specifics required to read git style configuration files.
277 This variation behaves much like the :manpage:`git-config(1)` command, such that the
278 configuration will be read on demand based on the filepath given during
279 initialization.
281 The changes will automatically be written once the instance goes out of scope, but
282 can be triggered manually as well.
284 The configuration file will be locked if you intend to change values preventing
285 other instances to write concurrently.
287 :note:
288 The config is case-sensitive even when queried, hence section and option names
289 must match perfectly.
291 :note:
292 If used as a context manager, this will release the locked file.
293 """
295 # { Configuration
296 t_lock = LockFile
297 """The lock type determines the type of lock to use in new configuration readers.
299 They must be compatible to the :class:`~git.util.LockFile` interface.
300 A suitable alternative would be the :class:`~git.util.BlockingLockFile`.
301 """
303 re_comment = re.compile(r"^\s*[#;]")
304 # } END configuration
306 optvalueonly_source = r"\s*(?P<option>[^:=\s][^:=]*)"
308 OPTVALUEONLY = re.compile(optvalueonly_source)
310 OPTCRE = re.compile(optvalueonly_source + r"\s*(?P<vi>[:=])\s*" + r"(?P<value>.*)$")
312 del optvalueonly_source
314 _mutating_methods_ = ("add_section", "remove_section", "remove_option", "set")
315 """Names of :class:`~configparser.RawConfigParser` methods able to change the
316 instance."""
318 def __init__(
319 self,
320 file_or_files: Union[None, PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = None,
321 read_only: bool = True,
322 merge_includes: bool = True,
323 config_level: Union[Lit_config_levels, None] = None,
324 repo: Union["Repo", None] = None,
325 ) -> None:
326 """Initialize a configuration reader to read the given `file_or_files` and to
327 possibly allow changes to it by setting `read_only` False.
329 :param file_or_files:
330 A file path or file object, or a sequence of possibly more than one of them.
332 :param read_only:
333 If ``True``, the ConfigParser may only read the data, but not change it.
334 If ``False``, only a single file path or file object may be given. We will
335 write back the changes when they happen, or when the ConfigParser is
336 released. This will not happen if other configuration files have been
337 included.
339 :param merge_includes:
340 If ``True``, we will read files mentioned in ``[include]`` sections and
341 merge their contents into ours. This makes it impossible to write back an
342 individual configuration file. Thus, if you want to modify a single
343 configuration file, turn this off to leave the original dataset unaltered
344 when reading it.
346 :param repo:
347 Reference to repository to use if ``[includeIf]`` sections are found in
348 configuration files.
349 """
350 cp.RawConfigParser.__init__(self, dict_type=_OMD)
351 self._dict: Callable[..., _OMD]
352 self._defaults: _OMD
353 self._sections: _OMD
355 # Used in Python 3. Needs to stay in sync with sections for underlying
356 # implementation to work.
357 if not hasattr(self, "_proxies"):
358 self._proxies = self._dict()
360 if file_or_files is not None:
361 self._file_or_files: Union[PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = file_or_files
362 else:
363 if config_level is None:
364 if read_only:
365 self._file_or_files = [
366 get_config_path(cast(Lit_config_levels, f)) for f in CONFIG_LEVELS if f != "repository"
367 ]
368 else:
369 raise ValueError("No configuration level or configuration files specified")
370 else:
371 self._file_or_files = [get_config_path(config_level)]
373 self._read_only = read_only
374 self._dirty = False
375 self._is_initialized = False
376 self._merge_includes = merge_includes
377 self._repo = repo
378 self._lock: Union["LockFile", None] = None
379 self._acquire_lock()
381 def _acquire_lock(self) -> None:
382 if not self._read_only:
383 if not self._lock:
384 if isinstance(self._file_or_files, (str, os.PathLike)):
385 file_or_files = self._file_or_files
386 elif isinstance(self._file_or_files, (tuple, list, Sequence)):
387 raise ValueError(
388 "Write-ConfigParsers can operate on a single file only, multiple files have been passed"
389 )
390 else:
391 file_or_files = self._file_or_files.name
393 # END get filename from handle/stream
394 # Initialize lock base - we want to write.
395 self._lock = self.t_lock(file_or_files)
396 # END lock check
398 self._lock._obtain_lock()
399 # END read-only check
401 def __del__(self) -> None:
402 """Write pending changes if required and release locks."""
403 # NOTE: Only consistent in Python 2.
404 self.release()
406 def __enter__(self) -> "GitConfigParser":
407 self._acquire_lock()
408 return self
410 def __exit__(self, *args: Any) -> None:
411 self.release()
413 def release(self) -> None:
414 """Flush changes and release the configuration write lock. This instance must
415 not be used anymore afterwards.
417 In Python 3, it's required to explicitly release locks and flush changes, as
418 ``__del__`` is not called deterministically anymore.
419 """
420 # Checking for the lock here makes sure we do not raise during write()
421 # in case an invalid parser was created who could not get a lock.
422 if self.read_only or (self._lock and not self._lock._has_lock()):
423 return
425 try:
426 self.write()
427 except IOError:
428 _logger.error("Exception during destruction of GitConfigParser", exc_info=True)
429 except ReferenceError:
430 # This happens in Python 3... and usually means that some state cannot be
431 # written as the sections dict cannot be iterated. This usually happens when
432 # the interpreter is shutting down. Can it be fixed?
433 pass
434 finally:
435 if self._lock is not None:
436 self._lock._release_lock()
438 def optionxform(self, optionstr: str) -> str:
439 """Do not transform options in any way when writing."""
440 return optionstr
442 def _read(self, fp: Union[BufferedReader, IO[bytes]], fpname: str) -> None:
443 """Originally a direct copy of the Python 2.4 version of
444 :meth:`RawConfigParser._read <configparser.RawConfigParser._read>`, to ensure it
445 uses ordered dicts.
447 The ordering bug was fixed in Python 2.4, and dict itself keeps ordering since
448 Python 3.7. This has some other changes, especially that it ignores initial
449 whitespace, since git uses tabs. (Big comments are removed to be more compact.)
450 """
451 cursect = None # None, or a dictionary.
452 optname = None
453 lineno = 0
454 is_multi_line = False
455 e = None # None, or an exception.
457 def string_decode(v: str) -> str:
458 if v and v.endswith("\\"):
459 v = v[:-1]
460 # END cut trailing escapes to prevent decode error
462 return v.encode(defenc).decode("unicode_escape")
464 # END string_decode
466 while True:
467 # We assume to read binary!
468 line = fp.readline().decode(defenc)
469 if not line:
470 break
471 lineno = lineno + 1
472 # Comment or blank line?
473 if line.strip() == "" or self.re_comment.match(line):
474 continue
475 if line.split(None, 1)[0].lower() == "rem" and line[0] in "rR":
476 # No leading whitespace.
477 continue
479 # Is it a section header?
480 mo = self.SECTCRE.match(line.strip())
481 if not is_multi_line and mo:
482 sectname: str = mo.group("header").strip()
483 if sectname in self._sections:
484 cursect = self._sections[sectname]
485 elif sectname == cp.DEFAULTSECT:
486 cursect = self._defaults
487 else:
488 cursect = self._dict((("__name__", sectname),))
489 self._sections[sectname] = cursect
490 self._proxies[sectname] = None
491 # So sections can't start with a continuation line.
492 optname = None
493 # No section header in the file?
494 elif cursect is None:
495 raise cp.MissingSectionHeaderError(fpname, lineno, line)
496 # An option line?
497 elif not is_multi_line:
498 mo = self.OPTCRE.match(line)
499 if mo:
500 # We might just have handled the last line, which could contain a quotation we want to remove.
501 optname, vi, optval = mo.group("option", "vi", "value")
502 optname = self.optionxform(optname.rstrip())
504 if vi in ("=", ":") and ";" in optval and not optval.strip().startswith('"'):
505 pos = optval.find(";")
506 if pos != -1 and optval[pos - 1].isspace():
507 optval = optval[:pos]
508 optval = optval.strip()
510 if len(optval) < 2 or optval[0] != '"':
511 # Does not open quoting.
512 pass
513 elif optval[-1] != '"':
514 # Opens quoting and does not close: appears to start multi-line quoting.
515 is_multi_line = True
516 optval = string_decode(optval[1:])
517 elif optval.find("\\", 1, -1) == -1 and optval.find('"', 1, -1) == -1:
518 # Opens and closes quoting. Single line, and all we need is quote removal.
519 optval = optval[1:-1]
520 # TODO: Handle other quoted content, especially well-formed backslash escapes.
522 # Preserves multiple values for duplicate optnames.
523 cursect.add(optname, optval)
524 else:
525 # Check if it's an option with no value - it's just ignored by git.
526 if not self.OPTVALUEONLY.match(line):
527 if not e:
528 e = cp.ParsingError(fpname)
529 e.append(lineno, repr(line))
530 continue
531 else:
532 line = line.rstrip()
533 if line.endswith('"'):
534 is_multi_line = False
535 line = line[:-1]
536 # END handle quotations
537 optval = cursect.getlast(optname)
538 cursect.setlast(optname, optval + string_decode(line))
539 # END parse section or option
540 # END while reading
542 # If any parsing errors occurred, raise an exception.
543 if e:
544 raise e
546 def _has_includes(self) -> Union[bool, int]:
547 return self._merge_includes and len(self._included_paths())
549 def _included_paths(self) -> List[Tuple[str, str]]:
550 """List all paths that must be included to configuration.
552 :return:
553 The list of paths, where each path is a tuple of (option, value).
554 """
556 def _all_items(section: str) -> List[Tuple[str, str]]:
557 """Return all (key, value) pairs for a section, including duplicate keys."""
558 return [
559 (key, value)
560 for key, values in self._sections[section].items_all()
561 if key != "__name__"
562 for value in values
563 ]
565 paths = []
567 for section in self.sections():
568 if section == "include":
569 paths += _all_items(section)
571 match = CONDITIONAL_INCLUDE_REGEXP.search(section)
572 if match is None or self._repo is None:
573 continue
575 keyword = match.group(1)
576 value = match.group(2).strip()
578 if keyword in ["gitdir", "gitdir/i"]:
579 value = osp.expanduser(value)
581 if not any(value.startswith(s) for s in ["./", "/"]):
582 value = "**/" + value
583 if value.endswith("/"):
584 value += "**"
586 # Ensure that glob is always case insensitive if required.
587 if keyword.endswith("/i"):
588 value = re.sub(
589 r"[a-zA-Z]",
590 lambda m: f"[{m.group().lower()!r}{m.group().upper()!r}]",
591 value,
592 )
593 if self._repo.git_dir:
594 if fnmatch.fnmatchcase(os.fspath(self._repo.git_dir), value):
595 paths += _all_items(section)
597 elif keyword == "onbranch":
598 try:
599 branch_name = self._repo.active_branch.name
600 except TypeError:
601 # Ignore section if active branch cannot be retrieved.
602 continue
604 if fnmatch.fnmatchcase(branch_name, value):
605 paths += _all_items(section)
606 elif keyword == "hasconfig:remote.*.url":
607 for remote in self._repo.remotes:
608 if fnmatch.fnmatchcase(remote.url, value):
609 paths += _all_items(section)
610 break
611 return paths
613 def read(self) -> None: # type: ignore[override]
614 """Read the data stored in the files we have been initialized with.
616 This will ignore files that cannot be read, possibly leaving an empty
617 configuration.
619 :raise IOError:
620 If a file cannot be handled.
621 """
622 if self._is_initialized:
623 return
624 self._is_initialized = True
626 files_to_read: List[Union[PathLike, IO]] = [""]
627 if isinstance(self._file_or_files, (str, os.PathLike)):
628 # For str or Path, as str is a type of Sequence.
629 files_to_read = [self._file_or_files]
630 elif not isinstance(self._file_or_files, (tuple, list, Sequence)):
631 # Could merge with above isinstance once runtime type known.
632 files_to_read = [self._file_or_files]
633 else: # For lists or tuples.
634 files_to_read = list(self._file_or_files)
635 # END ensure we have a copy of the paths to handle
637 seen = set(files_to_read)
638 num_read_include_files = 0
639 while files_to_read:
640 file_path = files_to_read.pop(0)
641 file_ok = False
643 if hasattr(file_path, "seek"):
644 # Must be a file-object.
645 # TODO: Replace cast with assert to narrow type, once sure.
646 file_path = cast(IO[bytes], file_path)
647 self._read(file_path, file_path.name)
648 else:
649 try:
650 with open(file_path, "rb") as fp:
651 file_ok = True
652 self._read(fp, fp.name)
653 except IOError:
654 continue
656 # Read includes and append those that we didn't handle yet. We expect all
657 # paths to be normalized and absolute (and will ensure that is the case).
658 if self._has_includes():
659 for _, include_path in self._included_paths():
660 if include_path.startswith("~"):
661 include_path = osp.expanduser(include_path)
662 if not osp.isabs(include_path):
663 if not file_ok:
664 continue
665 # END ignore relative paths if we don't know the configuration file path
666 file_path = cast(PathLike, file_path)
667 assert osp.isabs(file_path), "Need absolute paths to be sure our cycle checks will work"
668 include_path = osp.join(osp.dirname(file_path), include_path)
669 # END make include path absolute
670 include_path = osp.normpath(include_path)
671 if include_path in seen or not os.access(include_path, os.R_OK):
672 continue
673 seen.add(include_path)
674 # Insert included file to the top to be considered first.
675 files_to_read.insert(0, include_path)
676 num_read_include_files += 1
677 # END each include path in configuration file
678 # END handle includes
679 # END for each file object to read
681 # If there was no file included, we can safely write back (potentially) the
682 # configuration file without altering its meaning.
683 if num_read_include_files == 0:
684 self._merge_includes = False
686 def _write(self, fp: IO) -> None:
687 """Write an .ini-format representation of the configuration state in
688 git compatible format."""
690 def write_section(name: str, section_dict: _OMD) -> None:
691 fp.write(("[%s]\n" % name).encode(defenc))
693 values: Sequence[str] # Runtime only gets str in tests, but should be whatever _OMD stores.
694 v: str
695 for key, values in section_dict.items_all():
696 if key == "__name__":
697 continue
699 for v in values:
700 fp.write(("\t%s = %s\n" % (key, self._value_to_string(v).replace("\n", "\n\t"))).encode(defenc))
701 # END if key is not __name__
703 # END section writing
705 if self._defaults:
706 write_section(cp.DEFAULTSECT, self._defaults)
707 value: _OMD
709 for name, value in self._sections.items():
710 write_section(name, value)
712 def items(self, section_name: str) -> List[Tuple[str, str]]: # type: ignore[override]
713 """:return: list((option, value), ...) pairs of all items in the given section"""
714 return [(k, v) for k, v in super().items(section_name) if k != "__name__"]
716 def items_all(self, section_name: str) -> List[Tuple[str, List[str]]]:
717 """:return: list((option, [values...]), ...) pairs of all items in the given section"""
718 rv = _OMD(self._defaults)
720 for k, vs in self._sections[section_name].items_all():
721 if k == "__name__":
722 continue
724 if k in rv and rv.getall(k) == vs:
725 continue
727 for v in vs:
728 rv.add(k, v)
730 return rv.items_all()
732 @needs_values
733 def write(self) -> None:
734 """Write changes to our file, if there are changes at all.
736 :raise IOError:
737 If this is a read-only writer instance or if we could not obtain a file
738 lock.
739 """
740 self._assure_writable("write")
741 if not self._dirty:
742 return
744 if isinstance(self._file_or_files, (list, tuple)):
745 raise AssertionError(
746 "Cannot write back if there is not exactly a single file to write to, have %i files"
747 % len(self._file_or_files)
748 )
749 # END assert multiple files
751 if self._has_includes():
752 _logger.debug(
753 "Skipping write-back of configuration file as include files were merged in."
754 + "Set merge_includes=False to prevent this."
755 )
756 return
757 # END stop if we have include files
759 fp = self._file_or_files
761 # We have a physical file on disk, so get a lock.
762 is_file_lock = isinstance(fp, (str, os.PathLike, IOBase)) # TODO: Use PathLike (having dropped 3.5).
763 if is_file_lock and self._lock is not None: # Else raise error?
764 self._lock._obtain_lock()
766 if not hasattr(fp, "seek"):
767 fp = cast(PathLike, fp)
768 with open(fp, "wb") as fp_open:
769 self._write(fp_open)
770 else:
771 fp = cast("BytesIO", fp)
772 fp.seek(0)
773 # Make sure we do not overwrite into an existing file.
774 if hasattr(fp, "truncate"):
775 fp.truncate()
776 self._write(fp)
778 def _assure_writable(self, method_name: str) -> None:
779 if self.read_only:
780 raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name))
782 def add_section(self, section: "cp._SectionName") -> None:
783 """Assures added options will stay in order."""
784 self._assure_config_name_safe(section, "section")
785 return super().add_section(section)
787 @property
788 def read_only(self) -> bool:
789 """:return: ``True`` if this instance may change the configuration file"""
790 return self._read_only
792 # FIXME: Figure out if default or return type can really include bool.
793 def get_value(
794 self,
795 section: str,
796 option: str,
797 default: Union[int, float, str, bool, None] = None,
798 ) -> Union[int, float, str, bool]:
799 """Get an option's value.
801 If multiple values are specified for this option in the section, the last one
802 specified is returned.
804 :param default:
805 If not ``None``, the given default value will be returned in case the option
806 did not exist.
808 :return:
809 A properly typed value, either int, float or string
811 :raise TypeError:
812 In case the value could not be understood.
813 Otherwise the exceptions known to the ConfigParser will be raised.
814 """
815 try:
816 valuestr = self.get(section, option)
817 except Exception:
818 if default is not None:
819 return default
820 raise
822 return self._string_to_value(valuestr)
824 def get_values(
825 self,
826 section: str,
827 option: str,
828 default: Union[int, float, str, bool, None] = None,
829 ) -> List[Union[int, float, str, bool]]:
830 """Get an option's values.
832 If multiple values are specified for this option in the section, all are
833 returned.
835 :param default:
836 If not ``None``, a list containing the given default value will be returned
837 in case the option did not exist.
839 :return:
840 A list of properly typed values, either int, float or string
842 :raise TypeError:
843 In case the value could not be understood.
844 Otherwise the exceptions known to the ConfigParser will be raised.
845 """
846 try:
847 self.sections()
848 lst = self._sections[section].getall(option)
849 except Exception:
850 if default is not None:
851 return [default]
852 raise
854 return [self._string_to_value(valuestr) for valuestr in lst]
856 def _string_to_value(self, valuestr: str) -> Union[int, float, str, bool]:
857 types = (int, float)
858 for numtype in types:
859 try:
860 val = numtype(valuestr)
861 # truncated value ?
862 if val != float(valuestr):
863 continue
864 return val
865 except (ValueError, TypeError):
866 continue
867 # END for each numeric type
869 # Try boolean values as git uses them.
870 vl = valuestr.lower()
871 if vl == "false":
872 return False
873 if vl == "true":
874 return True
876 if not isinstance(valuestr, str):
877 raise TypeError(
878 "Invalid value type: only int, long, float and str are allowed",
879 valuestr,
880 )
882 return valuestr
884 def _value_to_string(self, value: Union[str, bytes, int, float, bool]) -> str:
885 if isinstance(value, (int, float, bool)):
886 return str(value)
887 return force_text(value)
889 def _value_to_string_safe(self, value: Union[str, bytes, int, float, bool]) -> str:
890 value_str = self._value_to_string(value)
891 if UNSAFE_CONFIG_CHARS_RE.search(value_str):
892 raise ValueError("Git config values must not contain CR, LF, or NUL")
893 return value_str
895 def _assure_config_name_safe(self, name: "cp._SectionName", label: str) -> None:
896 if isinstance(name, str) and UNSAFE_CONFIG_CHARS_RE.search(name):
897 raise ValueError("Git config %s names must not contain CR, LF, or NUL" % label)
899 @needs_values
900 @set_dirty_and_flush_changes
901 def set(
902 self,
903 section: str,
904 option: str,
905 value: Union[str, bytes, int, float, bool, None] = None,
906 ) -> None:
907 self._assure_config_name_safe(section, "section")
908 self._assure_config_name_safe(option, "option")
909 if value is not None:
910 value = self._value_to_string_safe(value)
911 return super().set(section, option, value)
913 @needs_values
914 @set_dirty_and_flush_changes
915 def set_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser":
916 """Set the given option in section to the given value.
918 This will create the section if required, and will not throw as opposed to the
919 default ConfigParser ``set`` method.
921 :param section:
922 Name of the section in which the option resides or should reside.
924 :param option:
925 Name of the options whose value to set.
927 :param value:
928 Value to set the option to. It must be a string or convertible to a string.
930 :return:
931 This instance
932 """
933 self._assure_config_name_safe(section, "section")
934 self._assure_config_name_safe(option, "option")
935 value_str = self._value_to_string_safe(value)
936 if not self.has_section(section):
937 self.add_section(section)
938 super().set(section, option, value_str)
939 return self
941 @needs_values
942 @set_dirty_and_flush_changes
943 def add_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser":
944 """Add a value for the given option in section.
946 This will create the section if required, and will not throw as opposed to the
947 default ConfigParser ``set`` method. The value becomes the new value of the
948 option as returned by :meth:`get_value`, and appends to the list of values
949 returned by :meth:`get_values`.
951 :param section:
952 Name of the section in which the option resides or should reside.
954 :param option:
955 Name of the option.
957 :param value:
958 Value to add to option. It must be a string or convertible to a string.
960 :return:
961 This instance
962 """
963 self._assure_config_name_safe(section, "section")
964 self._assure_config_name_safe(option, "option")
965 value_str = self._value_to_string_safe(value)
966 if not self.has_section(section):
967 self.add_section(section)
968 self._sections[section].add(option, value_str)
969 return self
971 def rename_section(self, section: str, new_name: str) -> "GitConfigParser":
972 """Rename the given section to `new_name`.
974 :raise ValueError:
975 If:
977 * `section` doesn't exist.
978 * A section with `new_name` does already exist.
980 :return:
981 This instance
982 """
983 if not self.has_section(section):
984 raise ValueError("Source section '%s' doesn't exist" % section)
985 self._assure_config_name_safe(new_name, "section")
986 if self.has_section(new_name):
987 raise ValueError("Destination section '%s' already exists" % new_name)
989 super().add_section(new_name)
990 new_section = self._sections[new_name]
991 for k, vs in self.items_all(section):
992 new_section.setall(k, vs)
993 # END for each value to copy
995 # This call writes back the changes, which is why we don't have the respective
996 # decorator.
997 self.remove_section(section)
998 return self