Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/config.py: 61%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# config.py - Reading and writing Git config files
2# Copyright (C) 2011-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Reading and writing Git configuration files.
24Todo:
25 * preserve formatting when updating configuration files
26"""
28__all__ = [
29 "DEFAULT_MAX_INCLUDE_DEPTH",
30 "MAX_INCLUDE_FILE_SIZE",
31 "CaseInsensitiveOrderedMultiDict",
32 "ConditionMatcher",
33 "Config",
34 "ConfigDict",
35 "ConfigFile",
36 "ConfigKey",
37 "ConfigValue",
38 "FileOpener",
39 "StackedConfig",
40 "apply_instead_of",
41 "get_win_legacy_system_paths",
42 "get_win_system_paths",
43 "get_xdg_config_home_path",
44 "iter_instead_of",
45 "lower_key",
46 "match_glob_pattern",
47 "parse_submodules",
48 "read_submodules",
49]
51import logging
52import os
53import re
54import sys
55from collections.abc import (
56 Callable,
57 ItemsView,
58 Iterable,
59 Iterator,
60 KeysView,
61 Mapping,
62 MutableMapping,
63 ValuesView,
64)
65from contextlib import suppress
66from pathlib import Path
67from typing import (
68 IO,
69 Generic,
70 TypeVar,
71 overload,
72)
74from .file import GitFile, _GitFile
76ConfigKey = str | bytes | tuple[str | bytes, ...]
77ConfigValue = str | bytes | bool | int
79logger = logging.getLogger(__name__)
81# Type for file opener callback
82FileOpener = Callable[[str | os.PathLike[str]], IO[bytes]]
84# Type for includeIf condition matcher
85# Takes the condition value (e.g., "main" for onbranch:main) and returns bool
86ConditionMatcher = Callable[[str], bool]
88# Security limits for include files
89MAX_INCLUDE_FILE_SIZE = 1024 * 1024 # 1MB max for included config files
90DEFAULT_MAX_INCLUDE_DEPTH = 10 # Maximum recursion depth for includes
93def _match_gitdir_pattern(
94 path: bytes, pattern: bytes, ignorecase: bool = False
95) -> bool:
96 """Simple gitdir pattern matching for includeIf conditions.
98 This handles the basic gitdir patterns used in includeIf directives.
99 """
100 # Convert to strings for easier manipulation
101 path_str = path.decode("utf-8", errors="replace")
102 pattern_str = pattern.decode("utf-8", errors="replace")
104 # Normalize paths to use forward slashes for consistent matching
105 path_str = path_str.replace("\\", "/")
106 pattern_str = pattern_str.replace("\\", "/")
108 if ignorecase:
109 path_str = path_str.lower()
110 pattern_str = pattern_str.lower()
112 # Handle the common cases for gitdir patterns
113 if pattern_str.startswith("**/") and pattern_str.endswith("/**"):
114 # Pattern like **/dirname/** should match any path containing dirname
115 dirname = pattern_str[3:-3] # Remove **/ and /**
116 # Check if path contains the directory name as a path component
117 return ("/" + dirname + "/") in path_str or path_str.endswith("/" + dirname)
118 elif pattern_str.startswith("**/"):
119 # Pattern like **/filename
120 suffix = pattern_str[3:] # Remove **/
121 return suffix in path_str or path_str.endswith("/" + suffix)
122 elif pattern_str.endswith("/**"):
123 # Pattern like /path/to/dir/** should match /path/to/dir and any subdirectory
124 base_pattern = pattern_str[:-3] # Remove /**
125 return path_str == base_pattern or path_str.startswith(base_pattern + "/")
126 elif "**" in pattern_str:
127 # Handle patterns with ** in the middle
128 parts = pattern_str.split("**")
129 if len(parts) == 2:
130 prefix, suffix = parts
131 # Path must start with prefix and end with suffix (if any)
132 if prefix and not path_str.startswith(prefix):
133 return False
134 if suffix and not path_str.endswith(suffix):
135 return False
136 return True
138 # Direct match or simple glob pattern
139 if "*" in pattern_str or "?" in pattern_str or "[" in pattern_str:
140 import fnmatch
142 return fnmatch.fnmatch(path_str, pattern_str)
143 else:
144 return path_str == pattern_str
147def match_glob_pattern(value: str, pattern: str) -> bool:
148 r"""Match a value against a glob pattern.
150 Supports simple glob patterns like ``*`` and ``**``.
152 Raises:
153 ValueError: If the pattern is invalid
154 """
155 # Convert glob pattern to regex
156 pattern_escaped = re.escape(pattern)
157 # Replace escaped \*\* with .* (match anything)
158 pattern_escaped = pattern_escaped.replace(r"\*\*", ".*")
159 # Replace escaped \* with [^/]* (match anything except /)
160 pattern_escaped = pattern_escaped.replace(r"\*", "[^/]*")
161 # Anchor the pattern
162 pattern_regex = f"^{pattern_escaped}$"
164 try:
165 return bool(re.match(pattern_regex, value))
166 except re.error as e:
167 raise ValueError(f"Invalid glob pattern {pattern!r}: {e}")
170def lower_key(key: ConfigKey) -> ConfigKey:
171 """Convert a config key to lowercase, preserving subsection case.
173 Args:
174 key: Configuration key (str, bytes, or tuple)
176 Returns:
177 Key with section names lowercased, subsection names preserved
179 Raises:
180 TypeError: If key is not str, bytes, or tuple
181 """
182 if isinstance(key, (bytes, str)):
183 return key.lower()
185 if isinstance(key, tuple):
186 # For config sections, only lowercase the section name (first element)
187 # but preserve the case of subsection names (remaining elements)
188 if len(key) > 0:
189 first = key[0]
190 assert isinstance(first, (bytes, str))
191 return (first.lower(), *key[1:])
192 return key
194 raise TypeError(key)
197K = TypeVar("K", bound=ConfigKey) # Key type must be ConfigKey
198V = TypeVar("V") # Value type
199_T = TypeVar("_T") # For get() default parameter
202class CaseInsensitiveOrderedMultiDict(MutableMapping[K, V], Generic[K, V]):
203 """A case-insensitive ordered dictionary that can store multiple values per key.
205 This class maintains the order of insertions and allows multiple values
206 for the same key. Keys are compared case-insensitively.
207 """
209 def __init__(self, default_factory: Callable[[], V] | None = None) -> None:
210 """Initialize a CaseInsensitiveOrderedMultiDict.
212 Args:
213 default_factory: Optional factory function for default values
214 """
215 self._real: list[tuple[K, V]] = []
216 self._keyed: dict[ConfigKey, V] = {}
217 self._default_factory = default_factory
219 @classmethod
220 def make(
221 cls,
222 dict_in: "MutableMapping[K, V] | CaseInsensitiveOrderedMultiDict[K, V] | None" = None,
223 default_factory: Callable[[], V] | None = None,
224 ) -> "CaseInsensitiveOrderedMultiDict[K, V]":
225 """Create a CaseInsensitiveOrderedMultiDict from an existing mapping.
227 Args:
228 dict_in: Optional mapping to initialize from
229 default_factory: Optional factory function for default values
231 Returns:
232 New CaseInsensitiveOrderedMultiDict instance
234 Raises:
235 TypeError: If dict_in is not a mapping or None
236 """
237 if isinstance(dict_in, cls):
238 return dict_in
240 out = cls(default_factory=default_factory)
242 if dict_in is None:
243 return out
245 if not isinstance(dict_in, MutableMapping):
246 raise TypeError
248 for key, value in dict_in.items():
249 out[key] = value
251 return out
253 def __len__(self) -> int:
254 """Return the number of unique keys in the dictionary."""
255 return len(self._keyed)
257 def keys(self) -> KeysView[K]:
258 """Return a view of the dictionary's keys."""
259 # Return a view of the original keys (not lowercased)
260 # We need to deduplicate since _real can have duplicates
261 seen = set()
262 unique_keys = []
263 for k, _ in self._real:
264 lower = lower_key(k)
265 if lower not in seen:
266 seen.add(lower)
267 unique_keys.append(k)
268 from collections.abc import KeysView as ABCKeysView
270 class UniqueKeysView(ABCKeysView[K]):
271 def __init__(self, keys: list[K]):
272 self._keys = keys
274 def __contains__(self, key: object) -> bool:
275 return key in self._keys
277 def __iter__(self) -> Iterator[K]:
278 return iter(self._keys)
280 def __len__(self) -> int:
281 return len(self._keys)
283 return UniqueKeysView(unique_keys)
285 def items(self) -> ItemsView[K, V]:
286 """Return a view of the dictionary's (key, value) pairs in insertion order."""
288 # Return a view that iterates over the real list to preserve order
289 class OrderedItemsView(ItemsView[K, V]):
290 """Items view that preserves insertion order."""
292 def __init__(self, mapping: CaseInsensitiveOrderedMultiDict[K, V]):
293 self._mapping = mapping
295 def __iter__(self) -> Iterator[tuple[K, V]]:
296 return iter(self._mapping._real)
298 def __len__(self) -> int:
299 return len(self._mapping._real)
301 def __contains__(self, item: object) -> bool:
302 if not isinstance(item, tuple) or len(item) != 2:
303 return False
304 key, value = item
305 return any(k == key and v == value for k, v in self._mapping._real)
307 return OrderedItemsView(self)
309 def __iter__(self) -> Iterator[K]:
310 """Iterate over the dictionary's keys."""
311 # Return iterator over original keys (not lowercased), deduplicated
312 seen = set()
313 for k, _ in self._real:
314 lower = lower_key(k)
315 if lower not in seen:
316 seen.add(lower)
317 yield k
319 def values(self) -> ValuesView[V]:
320 """Return a view of the dictionary's values."""
321 return self._keyed.values()
323 def __setitem__(self, key: K, value: V) -> None:
324 """Set a value for a key, appending to existing values."""
325 self._real.append((key, value))
326 self._keyed[lower_key(key)] = value
328 def set(self, key: K, value: V) -> None:
329 """Set a value for a key, replacing all existing values.
331 Args:
332 key: The key to set
333 value: The value to set
334 """
335 # This method replaces all existing values for the key
336 lower = lower_key(key)
337 self._real = [(k, v) for k, v in self._real if lower_key(k) != lower]
338 self._real.append((key, value))
339 self._keyed[lower] = value
341 def __delitem__(self, key: K) -> None:
342 """Delete all values for a key.
344 Raises:
345 KeyError: If the key is not found
346 """
347 lower_k = lower_key(key)
348 del self._keyed[lower_k]
349 for i, (actual, unused_value) in reversed(list(enumerate(self._real))):
350 if lower_key(actual) == lower_k:
351 del self._real[i]
353 def __getitem__(self, item: K) -> V:
354 """Get the last value for a key.
356 Raises:
357 KeyError: If the key is not found
358 """
359 return self._keyed[lower_key(item)]
361 def get(self, key: K, /, default: V | _T | None = None) -> V | _T | None: # type: ignore[override]
362 """Get the last value for a key, or a default if not found.
364 Args:
365 key: The key to look up
366 default: Default value to return if key not found
368 Returns:
369 The value for the key, or default/default_factory result if not found
370 """
371 try:
372 return self[key]
373 except KeyError:
374 if default is not None:
375 return default
376 elif self._default_factory is not None:
377 return self._default_factory()
378 else:
379 return None
381 def get_all(self, key: K) -> Iterator[V]:
382 """Get all values for a key in insertion order.
384 Args:
385 key: The key to look up
387 Returns:
388 Iterator of all values for the key
389 """
390 lowered_key = lower_key(key)
391 for actual, value in self._real:
392 if lower_key(actual) == lowered_key:
393 yield value
395 def setdefault(self, key: K, default: V | None = None) -> V:
396 """Get value for key, setting it to default if not present.
398 Args:
399 key: The key to look up
400 default: Default value to set if key not found
402 Returns:
403 The existing value or the newly set default
405 Raises:
406 KeyError: If key not found and no default or default_factory
407 """
408 try:
409 return self[key]
410 except KeyError:
411 if default is not None:
412 self[key] = default
413 return default
414 elif self._default_factory is not None:
415 value = self._default_factory()
416 self[key] = value
417 return value
418 else:
419 raise
422Name = bytes
423NameLike = bytes | str
424Section = tuple[bytes, ...]
425SectionLike = bytes | str | tuple[bytes | str, ...]
426Value = bytes
427ValueLike = bytes | str
430class Config:
431 """A Git configuration."""
433 def get(self, section: SectionLike, name: NameLike) -> Value:
434 """Retrieve the contents of a configuration setting.
436 Args:
437 section: Tuple with section name and optional subsection name
438 name: Variable name
439 Returns:
440 Contents of the setting
441 Raises:
442 KeyError: if the value is not set
443 """
444 raise NotImplementedError(self.get)
446 def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
447 """Retrieve the contents of a multivar configuration setting.
449 Args:
450 section: Tuple with section name and optional subsection namee
451 name: Variable name
452 Returns:
453 Contents of the setting as iterable
454 Raises:
455 KeyError: if the value is not set
456 """
457 raise NotImplementedError(self.get_multivar)
459 @overload
460 def get_boolean(
461 self, section: SectionLike, name: NameLike, default: bool
462 ) -> bool: ...
464 @overload
465 def get_boolean(self, section: SectionLike, name: NameLike) -> bool | None: ...
467 def get_boolean(
468 self, section: SectionLike, name: NameLike, default: bool | None = None
469 ) -> bool | None:
470 """Retrieve a configuration setting as boolean.
472 Args:
473 section: Tuple with section name and optional subsection name
474 name: Name of the setting, including section and possible
475 subsection.
476 default: Default value if setting is not found
478 Returns:
479 Contents of the setting
480 """
481 try:
482 value = self.get(section, name)
483 except KeyError:
484 return default
485 if value.lower() == b"true":
486 return True
487 elif value.lower() == b"false":
488 return False
489 raise ValueError(f"not a valid boolean string: {value!r}")
491 def set(
492 self, section: SectionLike, name: NameLike, value: ValueLike | bool
493 ) -> None:
494 """Set a configuration value.
496 Args:
497 section: Tuple with section name and optional subsection namee
498 name: Name of the configuration value, including section
499 and optional subsection
500 value: value of the setting
501 """
502 raise NotImplementedError(self.set)
504 def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
505 """Iterate over the configuration pairs for a specific section.
507 Args:
508 section: Tuple with section name and optional subsection namee
509 Returns:
510 Iterator over (name, value) pairs
511 """
512 raise NotImplementedError(self.items)
514 def sections(self) -> Iterator[Section]:
515 """Iterate over the sections.
517 Returns: Iterator over section tuples
518 """
519 raise NotImplementedError(self.sections)
521 def has_section(self, name: Section) -> bool:
522 """Check if a specified section exists.
524 Args:
525 name: Name of section to check for
526 Returns:
527 boolean indicating whether the section exists
528 """
529 return name in self.sections()
532class ConfigDict(Config):
533 """Git configuration stored in a dictionary."""
535 def __init__(
536 self,
537 values: MutableMapping[Section, CaseInsensitiveOrderedMultiDict[Name, Value]]
538 | None = None,
539 encoding: str | None = None,
540 ) -> None:
541 """Create a new ConfigDict."""
542 if encoding is None:
543 encoding = sys.getdefaultencoding()
544 self.encoding = encoding
545 self._values: CaseInsensitiveOrderedMultiDict[
546 Section, CaseInsensitiveOrderedMultiDict[Name, Value]
547 ] = CaseInsensitiveOrderedMultiDict.make(
548 values, default_factory=CaseInsensitiveOrderedMultiDict
549 )
551 def __repr__(self) -> str:
552 """Return string representation of ConfigDict."""
553 return f"{self.__class__.__name__}({self._values!r})"
555 def __eq__(self, other: object) -> bool:
556 """Check equality with another ConfigDict."""
557 return isinstance(other, self.__class__) and other._values == self._values
559 def __getitem__(self, key: Section) -> CaseInsensitiveOrderedMultiDict[Name, Value]:
560 """Get configuration values for a section.
562 Raises:
563 KeyError: If section not found
564 """
565 return self._values.__getitem__(key)
567 def __setitem__(
568 self, key: Section, value: CaseInsensitiveOrderedMultiDict[Name, Value]
569 ) -> None:
570 """Set configuration values for a section."""
571 return self._values.__setitem__(key, value)
573 def __delitem__(self, key: Section) -> None:
574 """Delete a configuration section.
576 Raises:
577 KeyError: If section not found
578 """
579 return self._values.__delitem__(key)
581 def __iter__(self) -> Iterator[Section]:
582 """Iterate over configuration sections."""
583 return self._values.__iter__()
585 def __len__(self) -> int:
586 """Return the number of sections."""
587 return self._values.__len__()
589 def keys(self) -> KeysView[Section]:
590 """Return a view of section names."""
591 return self._values.keys()
593 @classmethod
594 def _parse_setting(cls, name: str) -> tuple[str, str | None, str]:
595 parts = name.split(".")
596 if len(parts) == 3:
597 return (parts[0], parts[1], parts[2])
598 else:
599 return (parts[0], None, parts[1])
601 def _check_section_and_name(
602 self, section: SectionLike, name: NameLike
603 ) -> tuple[Section, Name]:
604 if not isinstance(section, tuple):
605 section = (section,)
607 checked_section = tuple(
608 [
609 subsection.encode(self.encoding)
610 if not isinstance(subsection, bytes)
611 else subsection
612 for subsection in section
613 ]
614 )
616 if not isinstance(name, bytes):
617 name = name.encode(self.encoding)
619 return checked_section, name
621 def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
622 """Get multiple values for a configuration setting.
624 Args:
625 section: Section name
626 name: Setting name
628 Returns:
629 Iterator of configuration values
630 """
631 section, name = self._check_section_and_name(section, name)
633 if len(section) > 1:
634 try:
635 return self._values[section].get_all(name)
636 except KeyError:
637 pass
639 return self._values[(section[0],)].get_all(name)
641 def get(
642 self,
643 section: SectionLike,
644 name: NameLike,
645 ) -> Value:
646 """Get a configuration value.
648 Args:
649 section: Section name
650 name: Setting name
652 Returns:
653 Configuration value
655 Raises:
656 KeyError: if the value is not set
657 """
658 section, name = self._check_section_and_name(section, name)
660 if len(section) > 1:
661 try:
662 return self._values[section][name]
663 except KeyError:
664 pass
666 return self._values[(section[0],)][name]
668 def set(
669 self,
670 section: SectionLike,
671 name: NameLike,
672 value: ValueLike | bool,
673 ) -> None:
674 """Set a configuration value.
676 Args:
677 section: Section name
678 name: Setting name
679 value: Configuration value
680 """
681 section, name = self._check_section_and_name(section, name)
683 if isinstance(value, bool):
684 value = b"true" if value else b"false"
686 if not isinstance(value, bytes):
687 value = value.encode(self.encoding)
689 section_dict = self._values.setdefault(section)
690 if hasattr(section_dict, "set"):
691 section_dict.set(name, value)
692 else:
693 section_dict[name] = value
695 def add(
696 self,
697 section: SectionLike,
698 name: NameLike,
699 value: ValueLike | bool,
700 ) -> None:
701 """Add a value to a configuration setting, creating a multivar if needed."""
702 section, name = self._check_section_and_name(section, name)
704 if isinstance(value, bool):
705 value = b"true" if value else b"false"
707 if not isinstance(value, bytes):
708 value = value.encode(self.encoding)
710 self._values.setdefault(section)[name] = value
712 def remove(self, section: SectionLike, name: NameLike) -> None:
713 """Remove a configuration setting.
715 Args:
716 section: Section name
717 name: Setting name
719 Raises:
720 KeyError: If the section or name doesn't exist
721 """
722 section, name = self._check_section_and_name(section, name)
723 del self._values[section][name]
725 def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
726 """Get items in a section."""
727 section_bytes, _ = self._check_section_and_name(section, b"")
728 section_dict = self._values.get(section_bytes)
729 if section_dict is not None:
730 return iter(section_dict.items())
731 return iter([])
733 def sections(self) -> Iterator[Section]:
734 """Get all sections."""
735 return iter(self._values.keys())
738def _format_string(value: bytes) -> bytes:
739 if (
740 value.startswith((b" ", b"\t"))
741 or value.endswith((b" ", b"\t"))
742 or b"#" in value
743 ):
744 return b'"' + _escape_value(value) + b'"'
745 else:
746 return _escape_value(value)
749_ESCAPE_TABLE = {
750 ord(b"\\"): ord(b"\\"),
751 ord(b'"'): ord(b'"'),
752 ord(b"n"): ord(b"\n"),
753 ord(b"t"): ord(b"\t"),
754 ord(b"b"): ord(b"\b"),
755}
756_COMMENT_CHARS = [ord(b"#"), ord(b";")]
757_WHITESPACE_CHARS = [ord(b"\t"), ord(b" ")]
760def _parse_string(value: bytes) -> bytes:
761 value_array = bytearray(value.strip())
762 ret = bytearray()
763 whitespace = bytearray()
764 in_quotes = False
765 i = 0
766 while i < len(value_array):
767 c = value_array[i]
768 if c == ord(b"\\"):
769 i += 1
770 if i >= len(value_array):
771 # Backslash at end of string - treat as literal backslash
772 if whitespace:
773 ret.extend(whitespace)
774 whitespace = bytearray()
775 ret.append(ord(b"\\"))
776 else:
777 try:
778 v = _ESCAPE_TABLE[value_array[i]]
779 if whitespace:
780 ret.extend(whitespace)
781 whitespace = bytearray()
782 ret.append(v)
783 except KeyError:
784 # Unknown escape sequence - treat backslash as literal and process next char normally
785 if whitespace:
786 ret.extend(whitespace)
787 whitespace = bytearray()
788 ret.append(ord(b"\\"))
789 i -= 1 # Reprocess the character after the backslash
790 elif c == ord(b'"'):
791 in_quotes = not in_quotes
792 elif c in _COMMENT_CHARS and not in_quotes:
793 # the rest of the line is a comment
794 break
795 elif c in _WHITESPACE_CHARS:
796 whitespace.append(c)
797 else:
798 if whitespace:
799 ret.extend(whitespace)
800 whitespace = bytearray()
801 ret.append(c)
802 i += 1
804 if in_quotes:
805 raise ValueError("missing end quote")
807 return bytes(ret)
810def _escape_value(value: bytes) -> bytes:
811 """Escape a value."""
812 value = value.replace(b"\\", b"\\\\")
813 value = value.replace(b"\r", b"\\r")
814 value = value.replace(b"\n", b"\\n")
815 value = value.replace(b"\t", b"\\t")
816 value = value.replace(b'"', b'\\"')
817 return value
820def _check_variable_name(name: bytes) -> bool:
821 for i in range(len(name)):
822 c = name[i : i + 1]
823 if not c.isalnum() and c != b"-":
824 return False
825 return True
828def _check_section_name(name: bytes) -> bool:
829 for i in range(len(name)):
830 c = name[i : i + 1]
831 if not c.isalnum() and c not in (b"-", b"."):
832 return False
833 return True
836def _strip_comments(line: bytes) -> bytes:
837 comment_bytes = {ord(b"#"), ord(b";")}
838 quote = ord(b'"')
839 string_open = False
840 # Normalize line to bytearray for simple 2/3 compatibility
841 for i, character in enumerate(bytearray(line)):
842 # Comment characters outside balanced quotes denote comment start
843 if character == quote:
844 string_open = not string_open
845 elif not string_open and character in comment_bytes:
846 return line[:i]
847 return line
850def _is_line_continuation(value: bytes) -> bool:
851 """Check if a value ends with a line continuation backslash.
853 A line continuation occurs when a line ends with a backslash that is:
854 1. Not escaped (not preceded by another backslash)
855 2. Not within quotes
857 Args:
858 value: The value to check
860 Returns:
861 True if the value ends with a line continuation backslash
862 """
863 if not value.endswith((b"\\\n", b"\\\r\n")):
864 return False
866 # Remove only the newline characters, keep the content including the backslash
867 if value.endswith(b"\\\r\n"):
868 content = value[:-2] # Remove \r\n, keep the \
869 else:
870 content = value[:-1] # Remove \n, keep the \
872 if not content.endswith(b"\\"):
873 return False
875 # Count consecutive backslashes at the end
876 backslash_count = 0
877 for i in range(len(content) - 1, -1, -1):
878 if content[i : i + 1] == b"\\":
879 backslash_count += 1
880 else:
881 break
883 # If we have an odd number of backslashes, the last one is a line continuation
884 # If we have an even number, they are all escaped and there's no continuation
885 return backslash_count % 2 == 1
888def _parse_section_header_line(line: bytes) -> tuple[Section, bytes]:
889 # Parse section header ("[bla]")
890 line = _strip_comments(line).rstrip()
891 in_quotes = False
892 escaped = False
893 for i, c in enumerate(line):
894 if escaped:
895 escaped = False
896 continue
897 if c == ord(b'"'):
898 in_quotes = not in_quotes
899 if c == ord(b"\\"):
900 escaped = True
901 if c == ord(b"]") and not in_quotes:
902 last = i
903 break
904 else:
905 raise ValueError("expected trailing ]")
906 pts = line[1:last].split(b" ", 1)
907 line = line[last + 1 :]
908 section: Section
909 if len(pts) == 2:
910 # Handle subsections - Git allows more complex syntax for certain sections like includeIf
911 if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
912 # Standard quoted subsection
913 pts[1] = pts[1][1:-1]
914 elif pts[0] == b"includeIf":
915 # Special handling for includeIf sections which can have complex conditions
916 # Git allows these without strict quote validation
917 pts[1] = pts[1].strip()
918 if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
919 pts[1] = pts[1][1:-1]
920 else:
921 # Other sections must have quoted subsections
922 raise ValueError(f"Invalid subsection {pts[1]!r}")
923 if not _check_section_name(pts[0]):
924 raise ValueError(f"invalid section name {pts[0]!r}")
925 section = (pts[0], pts[1])
926 else:
927 if not _check_section_name(pts[0]):
928 raise ValueError(f"invalid section name {pts[0]!r}")
929 pts = pts[0].split(b".", 1)
930 if len(pts) == 2:
931 section = (pts[0], pts[1])
932 else:
933 section = (pts[0],)
934 return section, line
937class ConfigFile(ConfigDict):
938 """A Git configuration file, like .git/config or ~/.gitconfig."""
940 def __init__(
941 self,
942 values: MutableMapping[Section, CaseInsensitiveOrderedMultiDict[Name, Value]]
943 | None = None,
944 encoding: str | None = None,
945 ) -> None:
946 """Initialize a ConfigFile.
948 Args:
949 values: Optional mapping of configuration values
950 encoding: Optional encoding for the file (defaults to system encoding)
951 """
952 super().__init__(values=values, encoding=encoding)
953 self.path: str | None = None
954 self._included_paths: set[str] = set() # Track included files to prevent cycles
956 @classmethod
957 def from_file(
958 cls,
959 f: IO[bytes],
960 *,
961 config_dir: str | None = None,
962 included_paths: set[str] | None = None,
963 include_depth: int = 0,
964 max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
965 file_opener: FileOpener | None = None,
966 condition_matchers: Mapping[str, ConditionMatcher] | None = None,
967 ) -> "ConfigFile":
968 """Read configuration from a file-like object.
970 Args:
971 f: File-like object to read from
972 config_dir: Directory containing the config file (for relative includes)
973 included_paths: Set of already included paths (to prevent cycles)
974 include_depth: Current include depth (to prevent infinite recursion)
975 max_include_depth: Maximum allowed include depth
976 file_opener: Optional callback to open included files
977 condition_matchers: Optional dict of condition matchers for includeIf
978 """
979 if include_depth > max_include_depth:
980 # Prevent excessive recursion
981 raise ValueError(f"Maximum include depth ({max_include_depth}) exceeded")
983 ret = cls()
984 if included_paths is not None:
985 ret._included_paths = included_paths.copy()
987 section: Section | None = None
988 setting = None
989 continuation = None
990 for lineno, line in enumerate(f.readlines()):
991 if lineno == 0 and line.startswith(b"\xef\xbb\xbf"):
992 line = line[3:]
993 line = line.lstrip()
994 if setting is None:
995 if len(line) > 0 and line[:1] == b"[":
996 section, line = _parse_section_header_line(line)
997 ret._values.setdefault(section)
998 if _strip_comments(line).strip() == b"":
999 continue
1000 if section is None:
1001 raise ValueError(f"setting {line!r} without section")
1002 try:
1003 setting, value = line.split(b"=", 1)
1004 except ValueError:
1005 setting = line
1006 value = b"true"
1007 setting = setting.strip()
1008 if not _check_variable_name(setting):
1009 raise ValueError(f"invalid variable name {setting!r}")
1010 if _is_line_continuation(value):
1011 if value.endswith(b"\\\r\n"):
1012 continuation = value[:-3]
1013 else:
1014 continuation = value[:-2]
1015 else:
1016 continuation = None
1017 value = _parse_string(value)
1018 ret._values[section][setting] = value
1020 # Process include/includeIf directives
1021 ret._handle_include_directive(
1022 section,
1023 setting,
1024 value,
1025 config_dir=config_dir,
1026 include_depth=include_depth,
1027 max_include_depth=max_include_depth,
1028 file_opener=file_opener,
1029 condition_matchers=condition_matchers,
1030 )
1032 setting = None
1033 else: # continuation line
1034 assert continuation is not None
1035 if _is_line_continuation(line):
1036 if line.endswith(b"\\\r\n"):
1037 continuation += line[:-3]
1038 else:
1039 continuation += line[:-2]
1040 else:
1041 continuation += line
1042 value = _parse_string(continuation)
1043 assert section is not None # Already checked above
1044 ret._values[section][setting] = value
1046 # Process include/includeIf directives
1047 ret._handle_include_directive(
1048 section,
1049 setting,
1050 value,
1051 config_dir=config_dir,
1052 include_depth=include_depth,
1053 max_include_depth=max_include_depth,
1054 file_opener=file_opener,
1055 condition_matchers=condition_matchers,
1056 )
1058 continuation = None
1059 setting = None
1060 return ret
1062 def _handle_include_directive(
1063 self,
1064 section: Section | None,
1065 setting: bytes,
1066 value: bytes,
1067 *,
1068 config_dir: str | None,
1069 include_depth: int,
1070 max_include_depth: int,
1071 file_opener: FileOpener | None,
1072 condition_matchers: Mapping[str, ConditionMatcher] | None,
1073 ) -> None:
1074 """Handle include/includeIf directives during config parsing."""
1075 if (
1076 section is not None
1077 and setting == b"path"
1078 and (
1079 section[0].lower() == b"include"
1080 or (len(section) > 1 and section[0].lower() == b"includeif")
1081 )
1082 ):
1083 self._process_include(
1084 section,
1085 value,
1086 config_dir=config_dir,
1087 include_depth=include_depth,
1088 max_include_depth=max_include_depth,
1089 file_opener=file_opener,
1090 condition_matchers=condition_matchers,
1091 )
1093 def _process_include(
1094 self,
1095 section: Section,
1096 path_value: bytes,
1097 *,
1098 config_dir: str | None,
1099 include_depth: int,
1100 max_include_depth: int,
1101 file_opener: FileOpener | None,
1102 condition_matchers: Mapping[str, ConditionMatcher] | None,
1103 ) -> None:
1104 """Process an include or includeIf directive."""
1105 path_str = path_value.decode(self.encoding, errors="replace")
1107 # Handle includeIf conditions
1108 if len(section) > 1 and section[0].lower() == b"includeif":
1109 condition = section[1].decode(self.encoding, errors="replace")
1110 if not self._evaluate_includeif_condition(
1111 condition, config_dir, condition_matchers
1112 ):
1113 return
1115 # Resolve the include path
1116 include_path = self._resolve_include_path(path_str, config_dir)
1117 if not include_path:
1118 return
1120 # Check for circular includes
1121 try:
1122 abs_path = str(Path(include_path).resolve())
1123 except (OSError, ValueError) as e:
1124 # Invalid path - log and skip
1125 logger.debug("Invalid include path %r: %s", include_path, e)
1126 return
1127 if abs_path in self._included_paths:
1128 return
1130 # Load and merge the included file
1131 try:
1132 # Use provided file opener or default to GitFile
1133 opener: FileOpener
1134 if file_opener is None:
1136 def opener(path: str | os.PathLike[str]) -> IO[bytes]:
1137 return GitFile(path, "rb")
1138 else:
1139 opener = file_opener
1141 f = opener(include_path)
1142 except (OSError, ValueError) as e:
1143 # Git silently ignores missing or unreadable include files
1144 # Log for debugging purposes
1145 logger.debug("Invalid include path %r: %s", include_path, e)
1146 else:
1147 with f as included_file:
1148 # Track this path to prevent cycles
1149 self._included_paths.add(abs_path)
1151 # Parse the included file
1152 included_config = ConfigFile.from_file(
1153 included_file,
1154 config_dir=os.path.dirname(include_path),
1155 included_paths=self._included_paths,
1156 include_depth=include_depth + 1,
1157 max_include_depth=max_include_depth,
1158 file_opener=file_opener,
1159 condition_matchers=condition_matchers,
1160 )
1162 # Merge the included configuration
1163 self._merge_config(included_config)
1165 def _merge_config(self, other: "ConfigFile") -> None:
1166 """Merge another config file into this one."""
1167 for section, values in other._values.items():
1168 if section not in self._values:
1169 self._values[section] = CaseInsensitiveOrderedMultiDict()
1170 for key, value in values.items():
1171 self._values[section][key] = value
1173 def _resolve_include_path(self, path: str, config_dir: str | None) -> str | None:
1174 """Resolve an include path to an absolute path."""
1175 # Expand ~ to home directory
1176 path = os.path.expanduser(path)
1178 # If path is relative and we have a config directory, make it relative to that
1179 if not os.path.isabs(path) and config_dir:
1180 path = os.path.join(config_dir, path)
1182 return path
1184 def _evaluate_includeif_condition(
1185 self,
1186 condition: str,
1187 config_dir: str | None = None,
1188 condition_matchers: Mapping[str, ConditionMatcher] | None = None,
1189 ) -> bool:
1190 """Evaluate an includeIf condition."""
1191 # Try custom matchers first if provided
1192 if condition_matchers:
1193 for prefix, matcher in condition_matchers.items():
1194 if condition.startswith(prefix):
1195 return matcher(condition[len(prefix) :])
1197 # Fall back to built-in matchers
1198 if condition.startswith("hasconfig:"):
1199 return self._evaluate_hasconfig_condition(condition[10:])
1200 else:
1201 # Unknown condition type - log and ignore (Git behavior)
1202 logger.debug("Unknown includeIf condition: %r", condition)
1203 return False
1205 def _evaluate_hasconfig_condition(self, condition: str) -> bool:
1206 """Evaluate a hasconfig condition.
1208 Format: hasconfig:config.key:pattern
1209 Example: hasconfig:remote.*.url:ssh://org-*@github.com/**
1210 """
1211 # Split on the first colon to separate config key from pattern
1212 parts = condition.split(":", 1)
1213 if len(parts) != 2:
1214 logger.debug("Invalid hasconfig condition format: %r", condition)
1215 return False
1217 config_key, pattern = parts
1219 # Parse the config key to get section and name
1220 key_parts = config_key.split(".", 2)
1221 if len(key_parts) < 2:
1222 logger.debug("Invalid hasconfig config key: %r", config_key)
1223 return False
1225 # Handle wildcards in section names (e.g., remote.*)
1226 if len(key_parts) == 3 and key_parts[1] == "*":
1227 # Match any subsection
1228 section_prefix = key_parts[0].encode(self.encoding)
1229 name = key_parts[2].encode(self.encoding)
1231 # Check all sections that match the pattern
1232 for section in self.sections():
1233 if len(section) == 2 and section[0] == section_prefix:
1234 try:
1235 values = list(self.get_multivar(section, name))
1236 for value in values:
1237 if self._match_hasconfig_pattern(value, pattern):
1238 return True
1239 except KeyError:
1240 continue
1241 else:
1242 # Direct section lookup
1243 if len(key_parts) == 2:
1244 section = (key_parts[0].encode(self.encoding),)
1245 name = key_parts[1].encode(self.encoding)
1246 else:
1247 section = (
1248 key_parts[0].encode(self.encoding),
1249 key_parts[1].encode(self.encoding),
1250 )
1251 name = key_parts[2].encode(self.encoding)
1253 try:
1254 values = list(self.get_multivar(section, name))
1255 for value in values:
1256 if self._match_hasconfig_pattern(value, pattern):
1257 return True
1258 except KeyError:
1259 pass
1261 return False
1263 def _match_hasconfig_pattern(self, value: bytes, pattern: str) -> bool:
1264 """Match a config value against a hasconfig pattern.
1266 Supports simple glob patterns like ``*`` and ``**``.
1267 """
1268 value_str = value.decode(self.encoding, errors="replace")
1269 return match_glob_pattern(value_str, pattern)
1271 @classmethod
1272 def from_path(
1273 cls,
1274 path: str | os.PathLike[str],
1275 *,
1276 max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
1277 file_opener: FileOpener | None = None,
1278 condition_matchers: Mapping[str, ConditionMatcher] | None = None,
1279 ) -> "ConfigFile":
1280 """Read configuration from a file on disk.
1282 Args:
1283 path: Path to the configuration file
1284 max_include_depth: Maximum allowed include depth
1285 file_opener: Optional callback to open included files
1286 condition_matchers: Optional dict of condition matchers for includeIf
1287 """
1288 abs_path = os.fspath(path)
1289 config_dir = os.path.dirname(abs_path)
1291 # Use provided file opener or default to GitFile
1292 opener: FileOpener
1293 if file_opener is None:
1295 def opener(p: str | os.PathLike[str]) -> IO[bytes]:
1296 return GitFile(p, "rb")
1297 else:
1298 opener = file_opener
1300 with opener(abs_path) as f:
1301 ret = cls.from_file(
1302 f,
1303 config_dir=config_dir,
1304 max_include_depth=max_include_depth,
1305 file_opener=file_opener,
1306 condition_matchers=condition_matchers,
1307 )
1308 ret.path = abs_path
1309 return ret
1311 def write_to_path(self, path: str | os.PathLike[str] | None = None) -> None:
1312 """Write configuration to a file on disk."""
1313 if path is None:
1314 if self.path is None:
1315 raise ValueError("No path specified and no default path available")
1316 path_to_use: str | os.PathLike[str] = self.path
1317 else:
1318 path_to_use = path
1319 with GitFile(path_to_use, "wb") as f:
1320 self.write_to_file(f)
1322 def write_to_file(self, f: IO[bytes] | _GitFile) -> None:
1323 """Write configuration to a file-like object."""
1324 for section, values in self._values.items():
1325 try:
1326 section_name, subsection_name = section
1327 except ValueError:
1328 (section_name,) = section
1329 subsection_name = None
1330 if subsection_name is None:
1331 f.write(b"[" + section_name + b"]\n")
1332 else:
1333 f.write(b"[" + section_name + b' "' + subsection_name + b'"]\n')
1334 for key, value in values.items():
1335 value = _format_string(value)
1336 f.write(b"\t" + key + b" = " + value + b"\n")
1339def get_xdg_config_home_path(*path_segments: str) -> str:
1340 """Get a path in the XDG config home directory.
1342 Args:
1343 *path_segments: Path segments to join to the XDG config home
1345 Returns:
1346 Full path in XDG config home directory
1347 """
1348 xdg_config_home = os.environ.get(
1349 "XDG_CONFIG_HOME",
1350 os.path.expanduser("~/.config/"),
1351 )
1352 return os.path.join(xdg_config_home, *path_segments)
1355def _find_git_in_win_path() -> Iterator[str]:
1356 for exe in ("git.exe", "git.cmd"):
1357 for path in os.environ.get("PATH", "").split(";"):
1358 if os.path.exists(os.path.join(path, exe)):
1359 # in windows native shells (powershell/cmd) exe path is
1360 # .../Git/bin/git.exe or .../Git/cmd/git.exe
1361 #
1362 # in git-bash exe path is .../Git/mingw64/bin/git.exe
1363 git_dir, _bin_dir = os.path.split(path)
1364 yield git_dir
1365 parent_dir, basename = os.path.split(git_dir)
1366 if basename == "mingw32" or basename == "mingw64":
1367 yield parent_dir
1368 break
1371def _find_git_in_win_reg() -> Iterator[str]:
1372 import platform
1373 import winreg
1375 if platform.machine() == "AMD64":
1376 subkey = (
1377 "SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\"
1378 "CurrentVersion\\Uninstall\\Git_is1"
1379 )
1380 else:
1381 subkey = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\Git_is1"
1383 for key in (winreg.HKEY_CURRENT_USER, winreg.HKEY_LOCAL_MACHINE): # type: ignore[attr-defined,unused-ignore]
1384 with suppress(OSError):
1385 with winreg.OpenKey(key, subkey) as k: # type: ignore[attr-defined,unused-ignore]
1386 val, typ = winreg.QueryValueEx(k, "InstallLocation") # type: ignore[attr-defined,unused-ignore]
1387 if typ == winreg.REG_SZ: # type: ignore[attr-defined,unused-ignore]
1388 yield val
1391# There is no set standard for system config dirs on windows. We try the
1392# following:
1393# - %PROGRAMFILES%/Git/etc/gitconfig - Git for Windows (msysgit) config dir
1394# Used if CGit installation (Git/bin/git.exe) is found in PATH in the
1395# system registry
1396def get_win_system_paths() -> Iterator[str]:
1397 """Get current Windows system Git config paths.
1399 Only returns the current Git for Windows config location, not legacy paths.
1400 """
1401 # Try to find Git installation from PATH first
1402 for git_dir in _find_git_in_win_path():
1403 yield os.path.join(git_dir, "etc", "gitconfig")
1404 return # Only use the first found path
1406 # Fall back to registry if not found in PATH
1407 for git_dir in _find_git_in_win_reg():
1408 yield os.path.join(git_dir, "etc", "gitconfig")
1409 return # Only use the first found path
1412def get_win_legacy_system_paths() -> Iterator[str]:
1413 """Get legacy Windows system Git config paths.
1415 Returns all possible config paths including deprecated locations.
1416 This function can be used for diagnostics or migration purposes.
1417 """
1418 # Include deprecated PROGRAMDATA location
1419 if "PROGRAMDATA" in os.environ:
1420 yield os.path.join(os.environ["PROGRAMDATA"], "Git", "config")
1422 # Include all Git installations found
1423 for git_dir in _find_git_in_win_path():
1424 yield os.path.join(git_dir, "etc", "gitconfig")
1425 for git_dir in _find_git_in_win_reg():
1426 yield os.path.join(git_dir, "etc", "gitconfig")
1429class StackedConfig(Config):
1430 """Configuration which reads from multiple config files.."""
1432 def __init__(
1433 self, backends: list[ConfigFile], writable: ConfigFile | None = None
1434 ) -> None:
1435 """Initialize a StackedConfig.
1437 Args:
1438 backends: List of config files to read from (in order of precedence)
1439 writable: Optional config file to write changes to
1440 """
1441 self.backends = backends
1442 self.writable = writable
1444 def __repr__(self) -> str:
1445 """Return string representation of StackedConfig."""
1446 return f"<{self.__class__.__name__} for {self.backends!r}>"
1448 @classmethod
1449 def default(cls) -> "StackedConfig":
1450 """Create a StackedConfig with default system/user config files.
1452 Returns:
1453 StackedConfig with default configuration files loaded
1454 """
1455 return cls(cls.default_backends())
1457 @classmethod
1458 def default_backends(cls) -> list[ConfigFile]:
1459 """Retrieve the default configuration.
1461 See git-config(1) for details on the files searched.
1462 """
1463 paths = []
1465 # Handle GIT_CONFIG_GLOBAL - overrides user config paths
1466 try:
1467 paths.append(os.environ["GIT_CONFIG_GLOBAL"])
1468 except KeyError:
1469 paths.append(os.path.expanduser("~/.gitconfig"))
1470 paths.append(get_xdg_config_home_path("git", "config"))
1472 # Handle GIT_CONFIG_SYSTEM and GIT_CONFIG_NOSYSTEM
1473 try:
1474 paths.append(os.environ["GIT_CONFIG_SYSTEM"])
1475 except KeyError:
1476 if "GIT_CONFIG_NOSYSTEM" not in os.environ:
1477 paths.append("/etc/gitconfig")
1478 if sys.platform == "win32":
1479 paths.extend(get_win_system_paths())
1481 logger.debug("Loading gitconfig from paths: %s", paths)
1483 backends = []
1484 for path in paths:
1485 try:
1486 cf = ConfigFile.from_path(path)
1487 logger.debug("Successfully loaded gitconfig from: %s", path)
1488 except FileNotFoundError:
1489 logger.debug("Gitconfig file not found: %s", path)
1490 continue
1491 backends.append(cf)
1492 return backends
1494 def get(self, section: SectionLike, name: NameLike) -> Value:
1495 """Get value from configuration."""
1496 if not isinstance(section, tuple):
1497 section = (section,)
1498 for backend in self.backends:
1499 try:
1500 return backend.get(section, name)
1501 except KeyError:
1502 pass
1503 raise KeyError(name)
1505 def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
1506 """Get multiple values from configuration."""
1507 if not isinstance(section, tuple):
1508 section = (section,)
1509 for backend in self.backends:
1510 try:
1511 yield from backend.get_multivar(section, name)
1512 except KeyError:
1513 pass
1515 def set(
1516 self, section: SectionLike, name: NameLike, value: ValueLike | bool
1517 ) -> None:
1518 """Set value in configuration."""
1519 if self.writable is None:
1520 raise NotImplementedError(self.set)
1521 return self.writable.set(section, name, value)
1523 def sections(self) -> Iterator[Section]:
1524 """Get all sections."""
1525 seen = set()
1526 for backend in self.backends:
1527 for section in backend.sections():
1528 if section not in seen:
1529 seen.add(section)
1530 yield section
1533def read_submodules(
1534 path: str | os.PathLike[str],
1535) -> Iterator[tuple[bytes, bytes, bytes]]:
1536 """Read a .gitmodules file."""
1537 cfg = ConfigFile.from_path(path)
1538 return parse_submodules(cfg)
1541def parse_submodules(config: ConfigFile) -> Iterator[tuple[bytes, bytes, bytes]]:
1542 """Parse a gitmodules GitConfig file, returning submodules.
1544 Args:
1545 config: A `ConfigFile`
1546 Returns:
1547 list of tuples (submodule path, url, name),
1548 where name is quoted part of the section's name.
1549 """
1550 for section in config.sections():
1551 section_kind, section_name = section
1552 if section_kind == b"submodule":
1553 try:
1554 sm_path = config.get(section, b"path")
1555 sm_url = config.get(section, b"url")
1556 yield (sm_path, sm_url, section_name)
1557 except KeyError:
1558 # If either path or url is missing, just ignore this
1559 # submodule entry and move on to the next one. This is
1560 # how git itself handles malformed .gitmodule entries.
1561 pass
1564def iter_instead_of(config: Config, push: bool = False) -> Iterable[tuple[str, str]]:
1565 """Iterate over insteadOf / pushInsteadOf values."""
1566 for section in config.sections():
1567 if section[0] != b"url":
1568 continue
1569 replacement = section[1]
1570 try:
1571 needles = list(config.get_multivar(section, "insteadOf"))
1572 except KeyError:
1573 needles = []
1574 if push:
1575 try:
1576 needles += list(config.get_multivar(section, "pushInsteadOf"))
1577 except KeyError:
1578 pass
1579 for needle in needles:
1580 assert isinstance(needle, bytes)
1581 yield needle.decode("utf-8"), replacement.decode("utf-8")
1584def apply_instead_of(config: Config, orig_url: str, push: bool = False) -> str:
1585 """Apply insteadOf / pushInsteadOf to a URL."""
1586 longest_needle = ""
1587 updated_url = orig_url
1588 for needle, replacement in iter_instead_of(config, push):
1589 if not orig_url.startswith(needle):
1590 continue
1591 if len(longest_needle) < len(needle):
1592 longest_needle = needle
1593 updated_url = replacement + orig_url[len(needle) :]
1594 return updated_url