Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dulwich/config.py: 69%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# config.py - Reading and writing Git config files
2# Copyright (C) 2011-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
5# General Public License as public by the Free Software Foundation; version 2.0
6# or (at your option) any later version. You can redistribute it and/or
7# modify it under the terms of either of these two licenses.
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15# You should have received a copy of the licenses; if not, see
16# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
17# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
18# License, Version 2.0.
19#
21"""Reading and writing Git configuration files.
23Todo:
24 * preserve formatting when updating configuration files
25 * treat subsection names as case-insensitive for [branch.foo] style
26 subsections
27"""
29import os
30import sys
31from contextlib import suppress
32from typing import (
33 Any,
34 BinaryIO,
35 Dict,
36 Iterable,
37 Iterator,
38 KeysView,
39 List,
40 MutableMapping,
41 Optional,
42 Tuple,
43 Union,
44 overload,
45)
47from .file import GitFile
49SENTINEL = object()
52def lower_key(key):
53 if isinstance(key, (bytes, str)):
54 return key.lower()
56 if isinstance(key, Iterable):
57 return type(key)(map(lower_key, key)) # type: ignore
59 return key
62class CaseInsensitiveOrderedMultiDict(MutableMapping):
63 def __init__(self) -> None:
64 self._real: List[Any] = []
65 self._keyed: Dict[Any, Any] = {}
67 @classmethod
68 def make(cls, dict_in=None):
69 if isinstance(dict_in, cls):
70 return dict_in
72 out = cls()
74 if dict_in is None:
75 return out
77 if not isinstance(dict_in, MutableMapping):
78 raise TypeError
80 for key, value in dict_in.items():
81 out[key] = value
83 return out
85 def __len__(self) -> int:
86 return len(self._keyed)
88 def keys(self) -> KeysView[Tuple[bytes, ...]]:
89 return self._keyed.keys()
91 def items(self):
92 return iter(self._real)
94 def __iter__(self):
95 return self._keyed.__iter__()
97 def values(self):
98 return self._keyed.values()
100 def __setitem__(self, key, value) -> None:
101 self._real.append((key, value))
102 self._keyed[lower_key(key)] = value
104 def __delitem__(self, key) -> None:
105 key = lower_key(key)
106 del self._keyed[key]
107 for i, (actual, unused_value) in reversed(list(enumerate(self._real))):
108 if lower_key(actual) == key:
109 del self._real[i]
111 def __getitem__(self, item):
112 return self._keyed[lower_key(item)]
114 def get(self, key, default=SENTINEL):
115 try:
116 return self[key]
117 except KeyError:
118 pass
120 if default is SENTINEL:
121 return type(self)()
123 return default
125 def get_all(self, key):
126 key = lower_key(key)
127 for actual, value in self._real:
128 if lower_key(actual) == key:
129 yield value
131 def setdefault(self, key, default=SENTINEL):
132 try:
133 return self[key]
134 except KeyError:
135 self[key] = self.get(key, default)
137 return self[key]
140Name = bytes
141NameLike = Union[bytes, str]
142Section = Tuple[bytes, ...]
143SectionLike = Union[bytes, str, Tuple[Union[bytes, str], ...]]
144Value = bytes
145ValueLike = Union[bytes, str]
148class Config:
149 """A Git configuration."""
151 def get(self, section: SectionLike, name: NameLike) -> Value:
152 """Retrieve the contents of a configuration setting.
154 Args:
155 section: Tuple with section name and optional subsection name
156 name: Variable name
157 Returns:
158 Contents of the setting
159 Raises:
160 KeyError: if the value is not set
161 """
162 raise NotImplementedError(self.get)
164 def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
165 """Retrieve the contents of a multivar configuration setting.
167 Args:
168 section: Tuple with section name and optional subsection namee
169 name: Variable name
170 Returns:
171 Contents of the setting as iterable
172 Raises:
173 KeyError: if the value is not set
174 """
175 raise NotImplementedError(self.get_multivar)
177 @overload
178 def get_boolean(
179 self, section: SectionLike, name: NameLike, default: bool
180 ) -> bool: ...
182 @overload
183 def get_boolean(self, section: SectionLike, name: NameLike) -> Optional[bool]: ...
185 def get_boolean(
186 self, section: SectionLike, name: NameLike, default: Optional[bool] = None
187 ) -> Optional[bool]:
188 """Retrieve a configuration setting as boolean.
190 Args:
191 section: Tuple with section name and optional subsection name
192 name: Name of the setting, including section and possible
193 subsection.
195 Returns:
196 Contents of the setting
197 """
198 try:
199 value = self.get(section, name)
200 except KeyError:
201 return default
202 if value.lower() == b"true":
203 return True
204 elif value.lower() == b"false":
205 return False
206 raise ValueError(f"not a valid boolean string: {value!r}")
208 def set(
209 self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
210 ) -> None:
211 """Set a configuration value.
213 Args:
214 section: Tuple with section name and optional subsection namee
215 name: Name of the configuration value, including section
216 and optional subsection
217 value: value of the setting
218 """
219 raise NotImplementedError(self.set)
221 def items(self, section: SectionLike) -> Iterator[Tuple[Name, Value]]:
222 """Iterate over the configuration pairs for a specific section.
224 Args:
225 section: Tuple with section name and optional subsection namee
226 Returns:
227 Iterator over (name, value) pairs
228 """
229 raise NotImplementedError(self.items)
231 def sections(self) -> Iterator[Section]:
232 """Iterate over the sections.
234 Returns: Iterator over section tuples
235 """
236 raise NotImplementedError(self.sections)
238 def has_section(self, name: Section) -> bool:
239 """Check if a specified section exists.
241 Args:
242 name: Name of section to check for
243 Returns:
244 boolean indicating whether the section exists
245 """
246 return name in self.sections()
249class ConfigDict(Config, MutableMapping[Section, MutableMapping[Name, Value]]):
250 """Git configuration stored in a dictionary."""
252 def __init__(
253 self,
254 values: Union[
255 MutableMapping[Section, MutableMapping[Name, Value]], None
256 ] = None,
257 encoding: Union[str, None] = None,
258 ) -> None:
259 """Create a new ConfigDict."""
260 if encoding is None:
261 encoding = sys.getdefaultencoding()
262 self.encoding = encoding
263 self._values = CaseInsensitiveOrderedMultiDict.make(values)
265 def __repr__(self) -> str:
266 return f"{self.__class__.__name__}({self._values!r})"
268 def __eq__(self, other: object) -> bool:
269 return isinstance(other, self.__class__) and other._values == self._values
271 def __getitem__(self, key: Section) -> MutableMapping[Name, Value]:
272 return self._values.__getitem__(key)
274 def __setitem__(self, key: Section, value: MutableMapping[Name, Value]) -> None:
275 return self._values.__setitem__(key, value)
277 def __delitem__(self, key: Section) -> None:
278 return self._values.__delitem__(key)
280 def __iter__(self) -> Iterator[Section]:
281 return self._values.__iter__()
283 def __len__(self) -> int:
284 return self._values.__len__()
286 @classmethod
287 def _parse_setting(cls, name: str):
288 parts = name.split(".")
289 if len(parts) == 3:
290 return (parts[0], parts[1], parts[2])
291 else:
292 return (parts[0], None, parts[1])
294 def _check_section_and_name(
295 self, section: SectionLike, name: NameLike
296 ) -> Tuple[Section, Name]:
297 if not isinstance(section, tuple):
298 section = (section,)
300 checked_section = tuple(
301 [
302 subsection.encode(self.encoding)
303 if not isinstance(subsection, bytes)
304 else subsection
305 for subsection in section
306 ]
307 )
309 if not isinstance(name, bytes):
310 name = name.encode(self.encoding)
312 return checked_section, name
314 def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
315 section, name = self._check_section_and_name(section, name)
317 if len(section) > 1:
318 try:
319 return self._values[section].get_all(name)
320 except KeyError:
321 pass
323 return self._values[(section[0],)].get_all(name)
325 def get( # type: ignore[override]
326 self,
327 section: SectionLike,
328 name: NameLike,
329 ) -> Value:
330 section, name = self._check_section_and_name(section, name)
332 if len(section) > 1:
333 try:
334 return self._values[section][name]
335 except KeyError:
336 pass
338 return self._values[(section[0],)][name]
340 def set(
341 self,
342 section: SectionLike,
343 name: NameLike,
344 value: Union[ValueLike, bool],
345 ) -> None:
346 section, name = self._check_section_and_name(section, name)
348 if isinstance(value, bool):
349 value = b"true" if value else b"false"
351 if not isinstance(value, bytes):
352 value = value.encode(self.encoding)
354 self._values.setdefault(section)[name] = value
356 def items( # type: ignore[override]
357 self, section: Section
358 ) -> Iterator[Tuple[Name, Value]]:
359 return self._values.get(section).items()
361 def sections(self) -> Iterator[Section]:
362 return self._values.keys()
365def _format_string(value: bytes) -> bytes:
366 if (
367 value.startswith((b" ", b"\t"))
368 or value.endswith((b" ", b"\t"))
369 or b"#" in value
370 ):
371 return b'"' + _escape_value(value) + b'"'
372 else:
373 return _escape_value(value)
376_ESCAPE_TABLE = {
377 ord(b"\\"): ord(b"\\"),
378 ord(b'"'): ord(b'"'),
379 ord(b"n"): ord(b"\n"),
380 ord(b"t"): ord(b"\t"),
381 ord(b"b"): ord(b"\b"),
382}
383_COMMENT_CHARS = [ord(b"#"), ord(b";")]
384_WHITESPACE_CHARS = [ord(b"\t"), ord(b" ")]
387def _parse_string(value: bytes) -> bytes:
388 value = bytearray(value.strip())
389 ret = bytearray()
390 whitespace = bytearray()
391 in_quotes = False
392 i = 0
393 while i < len(value):
394 c = value[i]
395 if c == ord(b"\\"):
396 i += 1
397 try:
398 v = _ESCAPE_TABLE[value[i]]
399 except IndexError as exc:
400 raise ValueError(
401 "escape character in %r at %d before end of string" % (value, i)
402 ) from exc
403 except KeyError as exc:
404 raise ValueError(
405 "escape character followed by unknown character "
406 "%s at %d in %r" % (value[i], i, value)
407 ) from exc
408 if whitespace:
409 ret.extend(whitespace)
410 whitespace = bytearray()
411 ret.append(v)
412 elif c == ord(b'"'):
413 in_quotes = not in_quotes
414 elif c in _COMMENT_CHARS and not in_quotes:
415 # the rest of the line is a comment
416 break
417 elif c in _WHITESPACE_CHARS:
418 whitespace.append(c)
419 else:
420 if whitespace:
421 ret.extend(whitespace)
422 whitespace = bytearray()
423 ret.append(c)
424 i += 1
426 if in_quotes:
427 raise ValueError("missing end quote")
429 return bytes(ret)
432def _escape_value(value: bytes) -> bytes:
433 """Escape a value."""
434 value = value.replace(b"\\", b"\\\\")
435 value = value.replace(b"\r", b"\\r")
436 value = value.replace(b"\n", b"\\n")
437 value = value.replace(b"\t", b"\\t")
438 value = value.replace(b'"', b'\\"')
439 return value
442def _check_variable_name(name: bytes) -> bool:
443 for i in range(len(name)):
444 c = name[i : i + 1]
445 if not c.isalnum() and c != b"-":
446 return False
447 return True
450def _check_section_name(name: bytes) -> bool:
451 for i in range(len(name)):
452 c = name[i : i + 1]
453 if not c.isalnum() and c not in (b"-", b"."):
454 return False
455 return True
458def _strip_comments(line: bytes) -> bytes:
459 comment_bytes = {ord(b"#"), ord(b";")}
460 quote = ord(b'"')
461 string_open = False
462 # Normalize line to bytearray for simple 2/3 compatibility
463 for i, character in enumerate(bytearray(line)):
464 # Comment characters outside balanced quotes denote comment start
465 if character == quote:
466 string_open = not string_open
467 elif not string_open and character in comment_bytes:
468 return line[:i]
469 return line
472def _parse_section_header_line(line: bytes) -> Tuple[Section, bytes]:
473 # Parse section header ("[bla]")
474 line = _strip_comments(line).rstrip()
475 in_quotes = False
476 escaped = False
477 for i, c in enumerate(line):
478 if escaped:
479 escaped = False
480 continue
481 if c == ord(b'"'):
482 in_quotes = not in_quotes
483 if c == ord(b"\\"):
484 escaped = True
485 if c == ord(b"]") and not in_quotes:
486 last = i
487 break
488 else:
489 raise ValueError("expected trailing ]")
490 pts = line[1:last].split(b" ", 1)
491 line = line[last + 1 :]
492 section: Section
493 if len(pts) == 2:
494 if pts[1][:1] != b'"' or pts[1][-1:] != b'"':
495 raise ValueError(f"Invalid subsection {pts[1]!r}")
496 else:
497 pts[1] = pts[1][1:-1]
498 if not _check_section_name(pts[0]):
499 raise ValueError(f"invalid section name {pts[0]!r}")
500 section = (pts[0], pts[1])
501 else:
502 if not _check_section_name(pts[0]):
503 raise ValueError(f"invalid section name {pts[0]!r}")
504 pts = pts[0].split(b".", 1)
505 if len(pts) == 2:
506 section = (pts[0], pts[1])
507 else:
508 section = (pts[0],)
509 return section, line
512class ConfigFile(ConfigDict):
513 """A Git configuration file, like .git/config or ~/.gitconfig."""
515 def __init__(
516 self,
517 values: Union[
518 MutableMapping[Section, MutableMapping[Name, Value]], None
519 ] = None,
520 encoding: Union[str, None] = None,
521 ) -> None:
522 super().__init__(values=values, encoding=encoding)
523 self.path: Optional[str] = None
525 @classmethod
526 def from_file(cls, f: BinaryIO) -> "ConfigFile":
527 """Read configuration from a file-like object."""
528 ret = cls()
529 section: Optional[Section] = None
530 setting = None
531 continuation = None
532 for lineno, line in enumerate(f.readlines()):
533 if lineno == 0 and line.startswith(b"\xef\xbb\xbf"):
534 line = line[3:]
535 line = line.lstrip()
536 if setting is None:
537 if len(line) > 0 and line[:1] == b"[":
538 section, line = _parse_section_header_line(line)
539 ret._values.setdefault(section)
540 if _strip_comments(line).strip() == b"":
541 continue
542 if section is None:
543 raise ValueError(f"setting {line!r} without section")
544 try:
545 setting, value = line.split(b"=", 1)
546 except ValueError:
547 setting = line
548 value = b"true"
549 setting = setting.strip()
550 if not _check_variable_name(setting):
551 raise ValueError(f"invalid variable name {setting!r}")
552 if value.endswith(b"\\\n"):
553 continuation = value[:-2]
554 elif value.endswith(b"\\\r\n"):
555 continuation = value[:-3]
556 else:
557 continuation = None
558 value = _parse_string(value)
559 ret._values[section][setting] = value
560 setting = None
561 else: # continuation line
562 if line.endswith(b"\\\n"):
563 continuation += line[:-2]
564 elif line.endswith(b"\\\r\n"):
565 continuation += line[:-3]
566 else:
567 continuation += line
568 value = _parse_string(continuation)
569 ret._values[section][setting] = value
570 continuation = None
571 setting = None
572 return ret
574 @classmethod
575 def from_path(cls, path: str) -> "ConfigFile":
576 """Read configuration from a file on disk."""
577 with GitFile(path, "rb") as f:
578 ret = cls.from_file(f)
579 ret.path = path
580 return ret
582 def write_to_path(self, path: Optional[str] = None) -> None:
583 """Write configuration to a file on disk."""
584 if path is None:
585 path = self.path
586 with GitFile(path, "wb") as f:
587 self.write_to_file(f)
589 def write_to_file(self, f: BinaryIO) -> None:
590 """Write configuration to a file-like object."""
591 for section, values in self._values.items():
592 try:
593 section_name, subsection_name = section
594 except ValueError:
595 (section_name,) = section
596 subsection_name = None
597 if subsection_name is None:
598 f.write(b"[" + section_name + b"]\n")
599 else:
600 f.write(b"[" + section_name + b' "' + subsection_name + b'"]\n')
601 for key, value in values.items():
602 value = _format_string(value)
603 f.write(b"\t" + key + b" = " + value + b"\n")
606def get_xdg_config_home_path(*path_segments):
607 xdg_config_home = os.environ.get(
608 "XDG_CONFIG_HOME",
609 os.path.expanduser("~/.config/"),
610 )
611 return os.path.join(xdg_config_home, *path_segments)
614def _find_git_in_win_path():
615 for exe in ("git.exe", "git.cmd"):
616 for path in os.environ.get("PATH", "").split(";"):
617 if os.path.exists(os.path.join(path, exe)):
618 # in windows native shells (powershell/cmd) exe path is
619 # .../Git/bin/git.exe or .../Git/cmd/git.exe
620 #
621 # in git-bash exe path is .../Git/mingw64/bin/git.exe
622 git_dir, _bin_dir = os.path.split(path)
623 yield git_dir
624 parent_dir, basename = os.path.split(git_dir)
625 if basename == "mingw32" or basename == "mingw64":
626 yield parent_dir
627 break
630def _find_git_in_win_reg():
631 import platform
632 import winreg
634 if platform.machine() == "AMD64":
635 subkey = (
636 "SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\"
637 "CurrentVersion\\Uninstall\\Git_is1"
638 )
639 else:
640 subkey = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\" "Uninstall\\Git_is1"
642 for key in (winreg.HKEY_CURRENT_USER, winreg.HKEY_LOCAL_MACHINE): # type: ignore
643 with suppress(OSError):
644 with winreg.OpenKey(key, subkey) as k: # type: ignore
645 val, typ = winreg.QueryValueEx(k, "InstallLocation") # type: ignore
646 if typ == winreg.REG_SZ: # type: ignore
647 yield val
650# There is no set standard for system config dirs on windows. We try the
651# following:
652# - %PROGRAMDATA%/Git/config - (deprecated) Windows config dir per CGit docs
653# - %PROGRAMFILES%/Git/etc/gitconfig - Git for Windows (msysgit) config dir
654# Used if CGit installation (Git/bin/git.exe) is found in PATH in the
655# system registry
656def get_win_system_paths():
657 if "PROGRAMDATA" in os.environ:
658 yield os.path.join(os.environ["PROGRAMDATA"], "Git", "config")
660 for git_dir in _find_git_in_win_path():
661 yield os.path.join(git_dir, "etc", "gitconfig")
662 for git_dir in _find_git_in_win_reg():
663 yield os.path.join(git_dir, "etc", "gitconfig")
666class StackedConfig(Config):
667 """Configuration which reads from multiple config files.."""
669 def __init__(
670 self, backends: List[ConfigFile], writable: Optional[ConfigFile] = None
671 ) -> None:
672 self.backends = backends
673 self.writable = writable
675 def __repr__(self) -> str:
676 return f"<{self.__class__.__name__} for {self.backends!r}>"
678 @classmethod
679 def default(cls) -> "StackedConfig":
680 return cls(cls.default_backends())
682 @classmethod
683 def default_backends(cls) -> List[ConfigFile]:
684 """Retrieve the default configuration.
686 See git-config(1) for details on the files searched.
687 """
688 paths = []
689 paths.append(os.path.expanduser("~/.gitconfig"))
690 paths.append(get_xdg_config_home_path("git", "config"))
692 if "GIT_CONFIG_NOSYSTEM" not in os.environ:
693 paths.append("/etc/gitconfig")
694 if sys.platform == "win32":
695 paths.extend(get_win_system_paths())
697 backends = []
698 for path in paths:
699 try:
700 cf = ConfigFile.from_path(path)
701 except FileNotFoundError:
702 continue
703 backends.append(cf)
704 return backends
706 def get(self, section: SectionLike, name: NameLike) -> Value:
707 if not isinstance(section, tuple):
708 section = (section,)
709 for backend in self.backends:
710 try:
711 return backend.get(section, name)
712 except KeyError:
713 pass
714 raise KeyError(name)
716 def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
717 if not isinstance(section, tuple):
718 section = (section,)
719 for backend in self.backends:
720 try:
721 yield from backend.get_multivar(section, name)
722 except KeyError:
723 pass
725 def set(
726 self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
727 ) -> None:
728 if self.writable is None:
729 raise NotImplementedError(self.set)
730 return self.writable.set(section, name, value)
732 def sections(self) -> Iterator[Section]:
733 seen = set()
734 for backend in self.backends:
735 for section in backend.sections():
736 if section not in seen:
737 seen.add(section)
738 yield section
741def read_submodules(path: str) -> Iterator[Tuple[bytes, bytes, bytes]]:
742 """Read a .gitmodules file."""
743 cfg = ConfigFile.from_path(path)
744 return parse_submodules(cfg)
747def parse_submodules(config: ConfigFile) -> Iterator[Tuple[bytes, bytes, bytes]]:
748 """Parse a gitmodules GitConfig file, returning submodules.
750 Args:
751 config: A `ConfigFile`
752 Returns:
753 list of tuples (submodule path, url, name),
754 where name is quoted part of the section's name.
755 """
756 for section in config.keys():
757 section_kind, section_name = section
758 if section_kind == b"submodule":
759 try:
760 sm_path = config.get(section, b"path")
761 sm_url = config.get(section, b"url")
762 yield (sm_path, sm_url, section_name)
763 except KeyError:
764 # If either path or url is missing, just ignore this
765 # submodule entry and move on to the next one. This is
766 # how git itself handles malformed .gitmodule entries.
767 pass
770def iter_instead_of(config: Config, push: bool = False) -> Iterable[Tuple[str, str]]:
771 """Iterate over insteadOf / pushInsteadOf values."""
772 for section in config.sections():
773 if section[0] != b"url":
774 continue
775 replacement = section[1]
776 try:
777 needles = list(config.get_multivar(section, "insteadOf"))
778 except KeyError:
779 needles = []
780 if push:
781 try:
782 needles += list(config.get_multivar(section, "pushInsteadOf"))
783 except KeyError:
784 pass
785 for needle in needles:
786 assert isinstance(needle, bytes)
787 yield needle.decode("utf-8"), replacement.decode("utf-8")
790def apply_instead_of(config: Config, orig_url: str, push: bool = False) -> str:
791 """Apply insteadOf / pushInsteadOf to a URL."""
792 longest_needle = ""
793 updated_url = orig_url
794 for needle, replacement in iter_instead_of(config, push):
795 if not orig_url.startswith(needle):
796 continue
797 if len(longest_needle) < len(needle):
798 longest_needle = needle
799 updated_url = replacement + orig_url[len(needle) :]
800 return updated_url