Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/config.py: 61%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# config.py - Reading and writing Git config files
2# Copyright (C) 2011-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Reading and writing Git configuration files.
24Todo:
25 * preserve formatting when updating configuration files
26"""
28import logging
29import os
30import re
31import sys
32from collections.abc import (
33 ItemsView,
34 Iterable,
35 Iterator,
36 KeysView,
37 Mapping,
38 MutableMapping,
39 ValuesView,
40)
41from contextlib import suppress
42from pathlib import Path
43from typing import (
44 IO,
45 Callable,
46 Generic,
47 Optional,
48 TypeVar,
49 Union,
50 overload,
51)
53from .file import GitFile, _GitFile
55ConfigKey = Union[str, bytes, tuple[Union[str, bytes], ...]]
56ConfigValue = Union[str, bytes, bool, int]
58logger = logging.getLogger(__name__)
60# Type for file opener callback
61FileOpener = Callable[[Union[str, os.PathLike[str]]], IO[bytes]]
63# Type for includeIf condition matcher
64# Takes the condition value (e.g., "main" for onbranch:main) and returns bool
65ConditionMatcher = Callable[[str], bool]
67# Security limits for include files
68MAX_INCLUDE_FILE_SIZE = 1024 * 1024 # 1MB max for included config files
69DEFAULT_MAX_INCLUDE_DEPTH = 10 # Maximum recursion depth for includes
72def _match_gitdir_pattern(
73 path: bytes, pattern: bytes, ignorecase: bool = False
74) -> bool:
75 """Simple gitdir pattern matching for includeIf conditions.
77 This handles the basic gitdir patterns used in includeIf directives.
78 """
79 # Convert to strings for easier manipulation
80 path_str = path.decode("utf-8", errors="replace")
81 pattern_str = pattern.decode("utf-8", errors="replace")
83 # Normalize paths to use forward slashes for consistent matching
84 path_str = path_str.replace("\\", "/")
85 pattern_str = pattern_str.replace("\\", "/")
87 if ignorecase:
88 path_str = path_str.lower()
89 pattern_str = pattern_str.lower()
91 # Handle the common cases for gitdir patterns
92 if pattern_str.startswith("**/") and pattern_str.endswith("/**"):
93 # Pattern like **/dirname/** should match any path containing dirname
94 dirname = pattern_str[3:-3] # Remove **/ and /**
95 # Check if path contains the directory name as a path component
96 return ("/" + dirname + "/") in path_str or path_str.endswith("/" + dirname)
97 elif pattern_str.startswith("**/"):
98 # Pattern like **/filename
99 suffix = pattern_str[3:] # Remove **/
100 return suffix in path_str or path_str.endswith("/" + suffix)
101 elif pattern_str.endswith("/**"):
102 # Pattern like /path/to/dir/** should match /path/to/dir and any subdirectory
103 base_pattern = pattern_str[:-3] # Remove /**
104 return path_str == base_pattern or path_str.startswith(base_pattern + "/")
105 elif "**" in pattern_str:
106 # Handle patterns with ** in the middle
107 parts = pattern_str.split("**")
108 if len(parts) == 2:
109 prefix, suffix = parts
110 # Path must start with prefix and end with suffix (if any)
111 if prefix and not path_str.startswith(prefix):
112 return False
113 if suffix and not path_str.endswith(suffix):
114 return False
115 return True
117 # Direct match or simple glob pattern
118 if "*" in pattern_str or "?" in pattern_str or "[" in pattern_str:
119 import fnmatch
121 return fnmatch.fnmatch(path_str, pattern_str)
122 else:
123 return path_str == pattern_str
126def match_glob_pattern(value: str, pattern: str) -> bool:
127 r"""Match a value against a glob pattern.
129 Supports simple glob patterns like ``*`` and ``**``.
131 Raises:
132 ValueError: If the pattern is invalid
133 """
134 # Convert glob pattern to regex
135 pattern_escaped = re.escape(pattern)
136 # Replace escaped \*\* with .* (match anything)
137 pattern_escaped = pattern_escaped.replace(r"\*\*", ".*")
138 # Replace escaped \* with [^/]* (match anything except /)
139 pattern_escaped = pattern_escaped.replace(r"\*", "[^/]*")
140 # Anchor the pattern
141 pattern_regex = f"^{pattern_escaped}$"
143 try:
144 return bool(re.match(pattern_regex, value))
145 except re.error as e:
146 raise ValueError(f"Invalid glob pattern {pattern!r}: {e}")
149def lower_key(key: ConfigKey) -> ConfigKey:
150 """Convert a config key to lowercase, preserving subsection case.
152 Args:
153 key: Configuration key (str, bytes, or tuple)
155 Returns:
156 Key with section names lowercased, subsection names preserved
158 Raises:
159 TypeError: If key is not str, bytes, or tuple
160 """
161 if isinstance(key, (bytes, str)):
162 return key.lower()
164 if isinstance(key, tuple):
165 # For config sections, only lowercase the section name (first element)
166 # but preserve the case of subsection names (remaining elements)
167 if len(key) > 0:
168 first = key[0]
169 assert isinstance(first, (bytes, str))
170 return (first.lower(), *key[1:])
171 return key
173 raise TypeError(key)
176K = TypeVar("K", bound=ConfigKey) # Key type must be ConfigKey
177V = TypeVar("V") # Value type
178_T = TypeVar("_T") # For get() default parameter
181class CaseInsensitiveOrderedMultiDict(MutableMapping[K, V], Generic[K, V]):
182 """A case-insensitive ordered dictionary that can store multiple values per key.
184 This class maintains the order of insertions and allows multiple values
185 for the same key. Keys are compared case-insensitively.
186 """
188 def __init__(self, default_factory: Optional[Callable[[], V]] = None) -> None:
189 """Initialize a CaseInsensitiveOrderedMultiDict.
191 Args:
192 default_factory: Optional factory function for default values
193 """
194 self._real: list[tuple[K, V]] = []
195 self._keyed: dict[ConfigKey, V] = {}
196 self._default_factory = default_factory
198 @classmethod
199 def make(
200 cls,
201 dict_in: Optional[
202 Union[MutableMapping[K, V], "CaseInsensitiveOrderedMultiDict[K, V]"]
203 ] = None,
204 default_factory: Optional[Callable[[], V]] = None,
205 ) -> "CaseInsensitiveOrderedMultiDict[K, V]":
206 """Create a CaseInsensitiveOrderedMultiDict from an existing mapping.
208 Args:
209 dict_in: Optional mapping to initialize from
210 default_factory: Optional factory function for default values
212 Returns:
213 New CaseInsensitiveOrderedMultiDict instance
215 Raises:
216 TypeError: If dict_in is not a mapping or None
217 """
218 if isinstance(dict_in, cls):
219 return dict_in
221 out = cls(default_factory=default_factory)
223 if dict_in is None:
224 return out
226 if not isinstance(dict_in, MutableMapping):
227 raise TypeError
229 for key, value in dict_in.items():
230 out[key] = value
232 return out
234 def __len__(self) -> int:
235 """Return the number of unique keys in the dictionary."""
236 return len(self._keyed)
238 def keys(self) -> KeysView[K]:
239 """Return a view of the dictionary's keys."""
240 # Return a view of the original keys (not lowercased)
241 # We need to deduplicate since _real can have duplicates
242 seen = set()
243 unique_keys = []
244 for k, _ in self._real:
245 lower = lower_key(k)
246 if lower not in seen:
247 seen.add(lower)
248 unique_keys.append(k)
249 from collections.abc import KeysView as ABCKeysView
251 class UniqueKeysView(ABCKeysView[K]):
252 def __init__(self, keys: list[K]):
253 self._keys = keys
255 def __contains__(self, key: object) -> bool:
256 return key in self._keys
258 def __iter__(self) -> Iterator[K]:
259 return iter(self._keys)
261 def __len__(self) -> int:
262 return len(self._keys)
264 return UniqueKeysView(unique_keys)
266 def items(self) -> ItemsView[K, V]:
267 """Return a view of the dictionary's (key, value) pairs in insertion order."""
269 # Return a view that iterates over the real list to preserve order
270 class OrderedItemsView(ItemsView[K, V]):
271 """Items view that preserves insertion order."""
273 def __init__(self, mapping: CaseInsensitiveOrderedMultiDict[K, V]):
274 self._mapping = mapping
276 def __iter__(self) -> Iterator[tuple[K, V]]:
277 return iter(self._mapping._real)
279 def __len__(self) -> int:
280 return len(self._mapping._real)
282 def __contains__(self, item: object) -> bool:
283 if not isinstance(item, tuple) or len(item) != 2:
284 return False
285 key, value = item
286 return any(k == key and v == value for k, v in self._mapping._real)
288 return OrderedItemsView(self)
290 def __iter__(self) -> Iterator[K]:
291 """Iterate over the dictionary's keys."""
292 # Return iterator over original keys (not lowercased), deduplicated
293 seen = set()
294 for k, _ in self._real:
295 lower = lower_key(k)
296 if lower not in seen:
297 seen.add(lower)
298 yield k
300 def values(self) -> ValuesView[V]:
301 """Return a view of the dictionary's values."""
302 return self._keyed.values()
304 def __setitem__(self, key: K, value: V) -> None:
305 """Set a value for a key, appending to existing values."""
306 self._real.append((key, value))
307 self._keyed[lower_key(key)] = value
309 def set(self, key: K, value: V) -> None:
310 """Set a value for a key, replacing all existing values.
312 Args:
313 key: The key to set
314 value: The value to set
315 """
316 # This method replaces all existing values for the key
317 lower = lower_key(key)
318 self._real = [(k, v) for k, v in self._real if lower_key(k) != lower]
319 self._real.append((key, value))
320 self._keyed[lower] = value
322 def __delitem__(self, key: K) -> None:
323 """Delete all values for a key.
325 Raises:
326 KeyError: If the key is not found
327 """
328 lower_k = lower_key(key)
329 del self._keyed[lower_k]
330 for i, (actual, unused_value) in reversed(list(enumerate(self._real))):
331 if lower_key(actual) == lower_k:
332 del self._real[i]
334 def __getitem__(self, item: K) -> V:
335 """Get the last value for a key.
337 Raises:
338 KeyError: If the key is not found
339 """
340 return self._keyed[lower_key(item)]
342 def get(self, key: K, /, default: Union[V, _T, None] = None) -> Union[V, _T, None]: # type: ignore[override]
343 """Get the last value for a key, or a default if not found.
345 Args:
346 key: The key to look up
347 default: Default value to return if key not found
349 Returns:
350 The value for the key, or default/default_factory result if not found
351 """
352 try:
353 return self[key]
354 except KeyError:
355 if default is not None:
356 return default
357 elif self._default_factory is not None:
358 return self._default_factory()
359 else:
360 return None
362 def get_all(self, key: K) -> Iterator[V]:
363 """Get all values for a key in insertion order.
365 Args:
366 key: The key to look up
368 Returns:
369 Iterator of all values for the key
370 """
371 lowered_key = lower_key(key)
372 for actual, value in self._real:
373 if lower_key(actual) == lowered_key:
374 yield value
376 def setdefault(self, key: K, default: Optional[V] = None) -> V:
377 """Get value for key, setting it to default if not present.
379 Args:
380 key: The key to look up
381 default: Default value to set if key not found
383 Returns:
384 The existing value or the newly set default
386 Raises:
387 KeyError: If key not found and no default or default_factory
388 """
389 try:
390 return self[key]
391 except KeyError:
392 if default is not None:
393 self[key] = default
394 return default
395 elif self._default_factory is not None:
396 value = self._default_factory()
397 self[key] = value
398 return value
399 else:
400 raise
403Name = bytes
404NameLike = Union[bytes, str]
405Section = tuple[bytes, ...]
406SectionLike = Union[bytes, str, tuple[Union[bytes, str], ...]]
407Value = bytes
408ValueLike = Union[bytes, str]
411class Config:
412 """A Git configuration."""
414 def get(self, section: SectionLike, name: NameLike) -> Value:
415 """Retrieve the contents of a configuration setting.
417 Args:
418 section: Tuple with section name and optional subsection name
419 name: Variable name
420 Returns:
421 Contents of the setting
422 Raises:
423 KeyError: if the value is not set
424 """
425 raise NotImplementedError(self.get)
427 def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
428 """Retrieve the contents of a multivar configuration setting.
430 Args:
431 section: Tuple with section name and optional subsection namee
432 name: Variable name
433 Returns:
434 Contents of the setting as iterable
435 Raises:
436 KeyError: if the value is not set
437 """
438 raise NotImplementedError(self.get_multivar)
440 @overload
441 def get_boolean(
442 self, section: SectionLike, name: NameLike, default: bool
443 ) -> bool: ...
445 @overload
446 def get_boolean(self, section: SectionLike, name: NameLike) -> Optional[bool]: ...
448 def get_boolean(
449 self, section: SectionLike, name: NameLike, default: Optional[bool] = None
450 ) -> Optional[bool]:
451 """Retrieve a configuration setting as boolean.
453 Args:
454 section: Tuple with section name and optional subsection name
455 name: Name of the setting, including section and possible
456 subsection.
457 default: Default value if setting is not found
459 Returns:
460 Contents of the setting
461 """
462 try:
463 value = self.get(section, name)
464 except KeyError:
465 return default
466 if value.lower() == b"true":
467 return True
468 elif value.lower() == b"false":
469 return False
470 raise ValueError(f"not a valid boolean string: {value!r}")
472 def set(
473 self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
474 ) -> None:
475 """Set a configuration value.
477 Args:
478 section: Tuple with section name and optional subsection namee
479 name: Name of the configuration value, including section
480 and optional subsection
481 value: value of the setting
482 """
483 raise NotImplementedError(self.set)
485 def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
486 """Iterate over the configuration pairs for a specific section.
488 Args:
489 section: Tuple with section name and optional subsection namee
490 Returns:
491 Iterator over (name, value) pairs
492 """
493 raise NotImplementedError(self.items)
495 def sections(self) -> Iterator[Section]:
496 """Iterate over the sections.
498 Returns: Iterator over section tuples
499 """
500 raise NotImplementedError(self.sections)
502 def has_section(self, name: Section) -> bool:
503 """Check if a specified section exists.
505 Args:
506 name: Name of section to check for
507 Returns:
508 boolean indicating whether the section exists
509 """
510 return name in self.sections()
513class ConfigDict(Config):
514 """Git configuration stored in a dictionary."""
516 def __init__(
517 self,
518 values: Union[
519 MutableMapping[Section, CaseInsensitiveOrderedMultiDict[Name, Value]], None
520 ] = None,
521 encoding: Union[str, None] = None,
522 ) -> None:
523 """Create a new ConfigDict."""
524 if encoding is None:
525 encoding = sys.getdefaultencoding()
526 self.encoding = encoding
527 self._values: CaseInsensitiveOrderedMultiDict[
528 Section, CaseInsensitiveOrderedMultiDict[Name, Value]
529 ] = CaseInsensitiveOrderedMultiDict.make(
530 values, default_factory=CaseInsensitiveOrderedMultiDict
531 )
533 def __repr__(self) -> str:
534 """Return string representation of ConfigDict."""
535 return f"{self.__class__.__name__}({self._values!r})"
537 def __eq__(self, other: object) -> bool:
538 """Check equality with another ConfigDict."""
539 return isinstance(other, self.__class__) and other._values == self._values
541 def __getitem__(self, key: Section) -> CaseInsensitiveOrderedMultiDict[Name, Value]:
542 """Get configuration values for a section.
544 Raises:
545 KeyError: If section not found
546 """
547 return self._values.__getitem__(key)
549 def __setitem__(
550 self, key: Section, value: CaseInsensitiveOrderedMultiDict[Name, Value]
551 ) -> None:
552 """Set configuration values for a section."""
553 return self._values.__setitem__(key, value)
555 def __delitem__(self, key: Section) -> None:
556 """Delete a configuration section.
558 Raises:
559 KeyError: If section not found
560 """
561 return self._values.__delitem__(key)
563 def __iter__(self) -> Iterator[Section]:
564 """Iterate over configuration sections."""
565 return self._values.__iter__()
567 def __len__(self) -> int:
568 """Return the number of sections."""
569 return self._values.__len__()
571 def keys(self) -> KeysView[Section]:
572 """Return a view of section names."""
573 return self._values.keys()
575 @classmethod
576 def _parse_setting(cls, name: str) -> tuple[str, Optional[str], str]:
577 parts = name.split(".")
578 if len(parts) == 3:
579 return (parts[0], parts[1], parts[2])
580 else:
581 return (parts[0], None, parts[1])
583 def _check_section_and_name(
584 self, section: SectionLike, name: NameLike
585 ) -> tuple[Section, Name]:
586 if not isinstance(section, tuple):
587 section = (section,)
589 checked_section = tuple(
590 [
591 subsection.encode(self.encoding)
592 if not isinstance(subsection, bytes)
593 else subsection
594 for subsection in section
595 ]
596 )
598 if not isinstance(name, bytes):
599 name = name.encode(self.encoding)
601 return checked_section, name
603 def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
604 """Get multiple values for a configuration setting.
606 Args:
607 section: Section name
608 name: Setting name
610 Returns:
611 Iterator of configuration values
612 """
613 section, name = self._check_section_and_name(section, name)
615 if len(section) > 1:
616 try:
617 return self._values[section].get_all(name)
618 except KeyError:
619 pass
621 return self._values[(section[0],)].get_all(name)
623 def get(
624 self,
625 section: SectionLike,
626 name: NameLike,
627 ) -> Value:
628 """Get a configuration value.
630 Args:
631 section: Section name
632 name: Setting name
634 Returns:
635 Configuration value
637 Raises:
638 KeyError: if the value is not set
639 """
640 section, name = self._check_section_and_name(section, name)
642 if len(section) > 1:
643 try:
644 return self._values[section][name]
645 except KeyError:
646 pass
648 return self._values[(section[0],)][name]
650 def set(
651 self,
652 section: SectionLike,
653 name: NameLike,
654 value: Union[ValueLike, bool],
655 ) -> None:
656 """Set a configuration value.
658 Args:
659 section: Section name
660 name: Setting name
661 value: Configuration value
662 """
663 section, name = self._check_section_and_name(section, name)
665 if isinstance(value, bool):
666 value = b"true" if value else b"false"
668 if not isinstance(value, bytes):
669 value = value.encode(self.encoding)
671 section_dict = self._values.setdefault(section)
672 if hasattr(section_dict, "set"):
673 section_dict.set(name, value)
674 else:
675 section_dict[name] = value
677 def add(
678 self,
679 section: SectionLike,
680 name: NameLike,
681 value: Union[ValueLike, bool],
682 ) -> None:
683 """Add a value to a configuration setting, creating a multivar if needed."""
684 section, name = self._check_section_and_name(section, name)
686 if isinstance(value, bool):
687 value = b"true" if value else b"false"
689 if not isinstance(value, bytes):
690 value = value.encode(self.encoding)
692 self._values.setdefault(section)[name] = value
694 def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
695 """Get items in a section."""
696 section_bytes, _ = self._check_section_and_name(section, b"")
697 section_dict = self._values.get(section_bytes)
698 if section_dict is not None:
699 return iter(section_dict.items())
700 return iter([])
702 def sections(self) -> Iterator[Section]:
703 """Get all sections."""
704 return iter(self._values.keys())
707def _format_string(value: bytes) -> bytes:
708 if (
709 value.startswith((b" ", b"\t"))
710 or value.endswith((b" ", b"\t"))
711 or b"#" in value
712 ):
713 return b'"' + _escape_value(value) + b'"'
714 else:
715 return _escape_value(value)
718_ESCAPE_TABLE = {
719 ord(b"\\"): ord(b"\\"),
720 ord(b'"'): ord(b'"'),
721 ord(b"n"): ord(b"\n"),
722 ord(b"t"): ord(b"\t"),
723 ord(b"b"): ord(b"\b"),
724}
725_COMMENT_CHARS = [ord(b"#"), ord(b";")]
726_WHITESPACE_CHARS = [ord(b"\t"), ord(b" ")]
729def _parse_string(value: bytes) -> bytes:
730 value_array = bytearray(value.strip())
731 ret = bytearray()
732 whitespace = bytearray()
733 in_quotes = False
734 i = 0
735 while i < len(value_array):
736 c = value_array[i]
737 if c == ord(b"\\"):
738 i += 1
739 if i >= len(value_array):
740 # Backslash at end of string - treat as literal backslash
741 if whitespace:
742 ret.extend(whitespace)
743 whitespace = bytearray()
744 ret.append(ord(b"\\"))
745 else:
746 try:
747 v = _ESCAPE_TABLE[value_array[i]]
748 if whitespace:
749 ret.extend(whitespace)
750 whitespace = bytearray()
751 ret.append(v)
752 except KeyError:
753 # Unknown escape sequence - treat backslash as literal and process next char normally
754 if whitespace:
755 ret.extend(whitespace)
756 whitespace = bytearray()
757 ret.append(ord(b"\\"))
758 i -= 1 # Reprocess the character after the backslash
759 elif c == ord(b'"'):
760 in_quotes = not in_quotes
761 elif c in _COMMENT_CHARS and not in_quotes:
762 # the rest of the line is a comment
763 break
764 elif c in _WHITESPACE_CHARS:
765 whitespace.append(c)
766 else:
767 if whitespace:
768 ret.extend(whitespace)
769 whitespace = bytearray()
770 ret.append(c)
771 i += 1
773 if in_quotes:
774 raise ValueError("missing end quote")
776 return bytes(ret)
779def _escape_value(value: bytes) -> bytes:
780 """Escape a value."""
781 value = value.replace(b"\\", b"\\\\")
782 value = value.replace(b"\r", b"\\r")
783 value = value.replace(b"\n", b"\\n")
784 value = value.replace(b"\t", b"\\t")
785 value = value.replace(b'"', b'\\"')
786 return value
789def _check_variable_name(name: bytes) -> bool:
790 for i in range(len(name)):
791 c = name[i : i + 1]
792 if not c.isalnum() and c != b"-":
793 return False
794 return True
797def _check_section_name(name: bytes) -> bool:
798 for i in range(len(name)):
799 c = name[i : i + 1]
800 if not c.isalnum() and c not in (b"-", b"."):
801 return False
802 return True
805def _strip_comments(line: bytes) -> bytes:
806 comment_bytes = {ord(b"#"), ord(b";")}
807 quote = ord(b'"')
808 string_open = False
809 # Normalize line to bytearray for simple 2/3 compatibility
810 for i, character in enumerate(bytearray(line)):
811 # Comment characters outside balanced quotes denote comment start
812 if character == quote:
813 string_open = not string_open
814 elif not string_open and character in comment_bytes:
815 return line[:i]
816 return line
819def _is_line_continuation(value: bytes) -> bool:
820 """Check if a value ends with a line continuation backslash.
822 A line continuation occurs when a line ends with a backslash that is:
823 1. Not escaped (not preceded by another backslash)
824 2. Not within quotes
826 Args:
827 value: The value to check
829 Returns:
830 True if the value ends with a line continuation backslash
831 """
832 if not value.endswith((b"\\\n", b"\\\r\n")):
833 return False
835 # Remove only the newline characters, keep the content including the backslash
836 if value.endswith(b"\\\r\n"):
837 content = value[:-2] # Remove \r\n, keep the \
838 else:
839 content = value[:-1] # Remove \n, keep the \
841 if not content.endswith(b"\\"):
842 return False
844 # Count consecutive backslashes at the end
845 backslash_count = 0
846 for i in range(len(content) - 1, -1, -1):
847 if content[i : i + 1] == b"\\":
848 backslash_count += 1
849 else:
850 break
852 # If we have an odd number of backslashes, the last one is a line continuation
853 # If we have an even number, they are all escaped and there's no continuation
854 return backslash_count % 2 == 1
857def _parse_section_header_line(line: bytes) -> tuple[Section, bytes]:
858 # Parse section header ("[bla]")
859 line = _strip_comments(line).rstrip()
860 in_quotes = False
861 escaped = False
862 for i, c in enumerate(line):
863 if escaped:
864 escaped = False
865 continue
866 if c == ord(b'"'):
867 in_quotes = not in_quotes
868 if c == ord(b"\\"):
869 escaped = True
870 if c == ord(b"]") and not in_quotes:
871 last = i
872 break
873 else:
874 raise ValueError("expected trailing ]")
875 pts = line[1:last].split(b" ", 1)
876 line = line[last + 1 :]
877 section: Section
878 if len(pts) == 2:
879 # Handle subsections - Git allows more complex syntax for certain sections like includeIf
880 if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
881 # Standard quoted subsection
882 pts[1] = pts[1][1:-1]
883 elif pts[0] == b"includeIf":
884 # Special handling for includeIf sections which can have complex conditions
885 # Git allows these without strict quote validation
886 pts[1] = pts[1].strip()
887 if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
888 pts[1] = pts[1][1:-1]
889 else:
890 # Other sections must have quoted subsections
891 raise ValueError(f"Invalid subsection {pts[1]!r}")
892 if not _check_section_name(pts[0]):
893 raise ValueError(f"invalid section name {pts[0]!r}")
894 section = (pts[0], pts[1])
895 else:
896 if not _check_section_name(pts[0]):
897 raise ValueError(f"invalid section name {pts[0]!r}")
898 pts = pts[0].split(b".", 1)
899 if len(pts) == 2:
900 section = (pts[0], pts[1])
901 else:
902 section = (pts[0],)
903 return section, line
906class ConfigFile(ConfigDict):
907 """A Git configuration file, like .git/config or ~/.gitconfig."""
909 def __init__(
910 self,
911 values: Union[
912 MutableMapping[Section, CaseInsensitiveOrderedMultiDict[Name, Value]], None
913 ] = None,
914 encoding: Union[str, None] = None,
915 ) -> None:
916 """Initialize a ConfigFile.
918 Args:
919 values: Optional mapping of configuration values
920 encoding: Optional encoding for the file (defaults to system encoding)
921 """
922 super().__init__(values=values, encoding=encoding)
923 self.path: Optional[str] = None
924 self._included_paths: set[str] = set() # Track included files to prevent cycles
926 @classmethod
927 def from_file(
928 cls,
929 f: IO[bytes],
930 *,
931 config_dir: Optional[str] = None,
932 included_paths: Optional[set[str]] = None,
933 include_depth: int = 0,
934 max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
935 file_opener: Optional[FileOpener] = None,
936 condition_matchers: Optional[Mapping[str, ConditionMatcher]] = None,
937 ) -> "ConfigFile":
938 """Read configuration from a file-like object.
940 Args:
941 f: File-like object to read from
942 config_dir: Directory containing the config file (for relative includes)
943 included_paths: Set of already included paths (to prevent cycles)
944 include_depth: Current include depth (to prevent infinite recursion)
945 max_include_depth: Maximum allowed include depth
946 file_opener: Optional callback to open included files
947 condition_matchers: Optional dict of condition matchers for includeIf
948 """
949 if include_depth > max_include_depth:
950 # Prevent excessive recursion
951 raise ValueError(f"Maximum include depth ({max_include_depth}) exceeded")
953 ret = cls()
954 if included_paths is not None:
955 ret._included_paths = included_paths.copy()
957 section: Optional[Section] = None
958 setting = None
959 continuation = None
960 for lineno, line in enumerate(f.readlines()):
961 if lineno == 0 and line.startswith(b"\xef\xbb\xbf"):
962 line = line[3:]
963 line = line.lstrip()
964 if setting is None:
965 if len(line) > 0 and line[:1] == b"[":
966 section, line = _parse_section_header_line(line)
967 ret._values.setdefault(section)
968 if _strip_comments(line).strip() == b"":
969 continue
970 if section is None:
971 raise ValueError(f"setting {line!r} without section")
972 try:
973 setting, value = line.split(b"=", 1)
974 except ValueError:
975 setting = line
976 value = b"true"
977 setting = setting.strip()
978 if not _check_variable_name(setting):
979 raise ValueError(f"invalid variable name {setting!r}")
980 if _is_line_continuation(value):
981 if value.endswith(b"\\\r\n"):
982 continuation = value[:-3]
983 else:
984 continuation = value[:-2]
985 else:
986 continuation = None
987 value = _parse_string(value)
988 ret._values[section][setting] = value
990 # Process include/includeIf directives
991 ret._handle_include_directive(
992 section,
993 setting,
994 value,
995 config_dir=config_dir,
996 include_depth=include_depth,
997 max_include_depth=max_include_depth,
998 file_opener=file_opener,
999 condition_matchers=condition_matchers,
1000 )
1002 setting = None
1003 else: # continuation line
1004 assert continuation is not None
1005 if _is_line_continuation(line):
1006 if line.endswith(b"\\\r\n"):
1007 continuation += line[:-3]
1008 else:
1009 continuation += line[:-2]
1010 else:
1011 continuation += line
1012 value = _parse_string(continuation)
1013 assert section is not None # Already checked above
1014 ret._values[section][setting] = value
1016 # Process include/includeIf directives
1017 ret._handle_include_directive(
1018 section,
1019 setting,
1020 value,
1021 config_dir=config_dir,
1022 include_depth=include_depth,
1023 max_include_depth=max_include_depth,
1024 file_opener=file_opener,
1025 condition_matchers=condition_matchers,
1026 )
1028 continuation = None
1029 setting = None
1030 return ret
1032 def _handle_include_directive(
1033 self,
1034 section: Optional[Section],
1035 setting: bytes,
1036 value: bytes,
1037 *,
1038 config_dir: Optional[str],
1039 include_depth: int,
1040 max_include_depth: int,
1041 file_opener: Optional[FileOpener],
1042 condition_matchers: Optional[Mapping[str, ConditionMatcher]],
1043 ) -> None:
1044 """Handle include/includeIf directives during config parsing."""
1045 if (
1046 section is not None
1047 and setting == b"path"
1048 and (
1049 section[0].lower() == b"include"
1050 or (len(section) > 1 and section[0].lower() == b"includeif")
1051 )
1052 ):
1053 self._process_include(
1054 section,
1055 value,
1056 config_dir=config_dir,
1057 include_depth=include_depth,
1058 max_include_depth=max_include_depth,
1059 file_opener=file_opener,
1060 condition_matchers=condition_matchers,
1061 )
1063 def _process_include(
1064 self,
1065 section: Section,
1066 path_value: bytes,
1067 *,
1068 config_dir: Optional[str],
1069 include_depth: int,
1070 max_include_depth: int,
1071 file_opener: Optional[FileOpener],
1072 condition_matchers: Optional[Mapping[str, ConditionMatcher]],
1073 ) -> None:
1074 """Process an include or includeIf directive."""
1075 path_str = path_value.decode(self.encoding, errors="replace")
1077 # Handle includeIf conditions
1078 if len(section) > 1 and section[0].lower() == b"includeif":
1079 condition = section[1].decode(self.encoding, errors="replace")
1080 if not self._evaluate_includeif_condition(
1081 condition, config_dir, condition_matchers
1082 ):
1083 return
1085 # Resolve the include path
1086 include_path = self._resolve_include_path(path_str, config_dir)
1087 if not include_path:
1088 return
1090 # Check for circular includes
1091 try:
1092 abs_path = str(Path(include_path).resolve())
1093 except (OSError, ValueError) as e:
1094 # Invalid path - log and skip
1095 logger.debug("Invalid include path %r: %s", include_path, e)
1096 return
1097 if abs_path in self._included_paths:
1098 return
1100 # Load and merge the included file
1101 try:
1102 # Use provided file opener or default to GitFile
1103 opener: FileOpener
1104 if file_opener is None:
1106 def opener(path: Union[str, os.PathLike[str]]) -> IO[bytes]:
1107 return GitFile(path, "rb")
1108 else:
1109 opener = file_opener
1111 f = opener(include_path)
1112 except (OSError, ValueError) as e:
1113 # Git silently ignores missing or unreadable include files
1114 # Log for debugging purposes
1115 logger.debug("Invalid include path %r: %s", include_path, e)
1116 else:
1117 with f as included_file:
1118 # Track this path to prevent cycles
1119 self._included_paths.add(abs_path)
1121 # Parse the included file
1122 included_config = ConfigFile.from_file(
1123 included_file,
1124 config_dir=os.path.dirname(include_path),
1125 included_paths=self._included_paths,
1126 include_depth=include_depth + 1,
1127 max_include_depth=max_include_depth,
1128 file_opener=file_opener,
1129 condition_matchers=condition_matchers,
1130 )
1132 # Merge the included configuration
1133 self._merge_config(included_config)
1135 def _merge_config(self, other: "ConfigFile") -> None:
1136 """Merge another config file into this one."""
1137 for section, values in other._values.items():
1138 if section not in self._values:
1139 self._values[section] = CaseInsensitiveOrderedMultiDict()
1140 for key, value in values.items():
1141 self._values[section][key] = value
1143 def _resolve_include_path(
1144 self, path: str, config_dir: Optional[str]
1145 ) -> Optional[str]:
1146 """Resolve an include path to an absolute path."""
1147 # Expand ~ to home directory
1148 path = os.path.expanduser(path)
1150 # If path is relative and we have a config directory, make it relative to that
1151 if not os.path.isabs(path) and config_dir:
1152 path = os.path.join(config_dir, path)
1154 return path
1156 def _evaluate_includeif_condition(
1157 self,
1158 condition: str,
1159 config_dir: Optional[str] = None,
1160 condition_matchers: Optional[Mapping[str, ConditionMatcher]] = None,
1161 ) -> bool:
1162 """Evaluate an includeIf condition."""
1163 # Try custom matchers first if provided
1164 if condition_matchers:
1165 for prefix, matcher in condition_matchers.items():
1166 if condition.startswith(prefix):
1167 return matcher(condition[len(prefix) :])
1169 # Fall back to built-in matchers
1170 if condition.startswith("hasconfig:"):
1171 return self._evaluate_hasconfig_condition(condition[10:])
1172 else:
1173 # Unknown condition type - log and ignore (Git behavior)
1174 logger.debug("Unknown includeIf condition: %r", condition)
1175 return False
1177 def _evaluate_hasconfig_condition(self, condition: str) -> bool:
1178 """Evaluate a hasconfig condition.
1180 Format: hasconfig:config.key:pattern
1181 Example: hasconfig:remote.*.url:ssh://org-*@github.com/**
1182 """
1183 # Split on the first colon to separate config key from pattern
1184 parts = condition.split(":", 1)
1185 if len(parts) != 2:
1186 logger.debug("Invalid hasconfig condition format: %r", condition)
1187 return False
1189 config_key, pattern = parts
1191 # Parse the config key to get section and name
1192 key_parts = config_key.split(".", 2)
1193 if len(key_parts) < 2:
1194 logger.debug("Invalid hasconfig config key: %r", config_key)
1195 return False
1197 # Handle wildcards in section names (e.g., remote.*)
1198 if len(key_parts) == 3 and key_parts[1] == "*":
1199 # Match any subsection
1200 section_prefix = key_parts[0].encode(self.encoding)
1201 name = key_parts[2].encode(self.encoding)
1203 # Check all sections that match the pattern
1204 for section in self.sections():
1205 if len(section) == 2 and section[0] == section_prefix:
1206 try:
1207 values = list(self.get_multivar(section, name))
1208 for value in values:
1209 if self._match_hasconfig_pattern(value, pattern):
1210 return True
1211 except KeyError:
1212 continue
1213 else:
1214 # Direct section lookup
1215 if len(key_parts) == 2:
1216 section = (key_parts[0].encode(self.encoding),)
1217 name = key_parts[1].encode(self.encoding)
1218 else:
1219 section = (
1220 key_parts[0].encode(self.encoding),
1221 key_parts[1].encode(self.encoding),
1222 )
1223 name = key_parts[2].encode(self.encoding)
1225 try:
1226 values = list(self.get_multivar(section, name))
1227 for value in values:
1228 if self._match_hasconfig_pattern(value, pattern):
1229 return True
1230 except KeyError:
1231 pass
1233 return False
1235 def _match_hasconfig_pattern(self, value: bytes, pattern: str) -> bool:
1236 """Match a config value against a hasconfig pattern.
1238 Supports simple glob patterns like ``*`` and ``**``.
1239 """
1240 value_str = value.decode(self.encoding, errors="replace")
1241 return match_glob_pattern(value_str, pattern)
1243 @classmethod
1244 def from_path(
1245 cls,
1246 path: Union[str, os.PathLike[str]],
1247 *,
1248 max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
1249 file_opener: Optional[FileOpener] = None,
1250 condition_matchers: Optional[Mapping[str, ConditionMatcher]] = None,
1251 ) -> "ConfigFile":
1252 """Read configuration from a file on disk.
1254 Args:
1255 path: Path to the configuration file
1256 max_include_depth: Maximum allowed include depth
1257 file_opener: Optional callback to open included files
1258 condition_matchers: Optional dict of condition matchers for includeIf
1259 """
1260 abs_path = os.fspath(path)
1261 config_dir = os.path.dirname(abs_path)
1263 # Use provided file opener or default to GitFile
1264 opener: FileOpener
1265 if file_opener is None:
1267 def opener(p: Union[str, os.PathLike[str]]) -> IO[bytes]:
1268 return GitFile(p, "rb")
1269 else:
1270 opener = file_opener
1272 with opener(abs_path) as f:
1273 ret = cls.from_file(
1274 f,
1275 config_dir=config_dir,
1276 max_include_depth=max_include_depth,
1277 file_opener=file_opener,
1278 condition_matchers=condition_matchers,
1279 )
1280 ret.path = abs_path
1281 return ret
1283 def write_to_path(
1284 self, path: Optional[Union[str, os.PathLike[str]]] = None
1285 ) -> None:
1286 """Write configuration to a file on disk."""
1287 if path is None:
1288 if self.path is None:
1289 raise ValueError("No path specified and no default path available")
1290 path_to_use: Union[str, os.PathLike[str]] = self.path
1291 else:
1292 path_to_use = path
1293 with GitFile(path_to_use, "wb") as f:
1294 self.write_to_file(f)
1296 def write_to_file(self, f: Union[IO[bytes], _GitFile]) -> None:
1297 """Write configuration to a file-like object."""
1298 for section, values in self._values.items():
1299 try:
1300 section_name, subsection_name = section
1301 except ValueError:
1302 (section_name,) = section
1303 subsection_name = None
1304 if subsection_name is None:
1305 f.write(b"[" + section_name + b"]\n")
1306 else:
1307 f.write(b"[" + section_name + b' "' + subsection_name + b'"]\n')
1308 for key, value in values.items():
1309 value = _format_string(value)
1310 f.write(b"\t" + key + b" = " + value + b"\n")
1313def get_xdg_config_home_path(*path_segments: str) -> str:
1314 """Get a path in the XDG config home directory.
1316 Args:
1317 *path_segments: Path segments to join to the XDG config home
1319 Returns:
1320 Full path in XDG config home directory
1321 """
1322 xdg_config_home = os.environ.get(
1323 "XDG_CONFIG_HOME",
1324 os.path.expanduser("~/.config/"),
1325 )
1326 return os.path.join(xdg_config_home, *path_segments)
1329def _find_git_in_win_path() -> Iterator[str]:
1330 for exe in ("git.exe", "git.cmd"):
1331 for path in os.environ.get("PATH", "").split(";"):
1332 if os.path.exists(os.path.join(path, exe)):
1333 # in windows native shells (powershell/cmd) exe path is
1334 # .../Git/bin/git.exe or .../Git/cmd/git.exe
1335 #
1336 # in git-bash exe path is .../Git/mingw64/bin/git.exe
1337 git_dir, _bin_dir = os.path.split(path)
1338 yield git_dir
1339 parent_dir, basename = os.path.split(git_dir)
1340 if basename == "mingw32" or basename == "mingw64":
1341 yield parent_dir
1342 break
1345def _find_git_in_win_reg() -> Iterator[str]:
1346 import platform
1347 import winreg
1349 if platform.machine() == "AMD64":
1350 subkey = (
1351 "SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\"
1352 "CurrentVersion\\Uninstall\\Git_is1"
1353 )
1354 else:
1355 subkey = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\Git_is1"
1357 for key in (winreg.HKEY_CURRENT_USER, winreg.HKEY_LOCAL_MACHINE): # type: ignore[attr-defined,unused-ignore]
1358 with suppress(OSError):
1359 with winreg.OpenKey(key, subkey) as k: # type: ignore[attr-defined,unused-ignore]
1360 val, typ = winreg.QueryValueEx(k, "InstallLocation") # type: ignore[attr-defined,unused-ignore]
1361 if typ == winreg.REG_SZ: # type: ignore[attr-defined,unused-ignore]
1362 yield val
1365# There is no set standard for system config dirs on windows. We try the
1366# following:
1367# - %PROGRAMFILES%/Git/etc/gitconfig - Git for Windows (msysgit) config dir
1368# Used if CGit installation (Git/bin/git.exe) is found in PATH in the
1369# system registry
1370def get_win_system_paths() -> Iterator[str]:
1371 """Get current Windows system Git config paths.
1373 Only returns the current Git for Windows config location, not legacy paths.
1374 """
1375 # Try to find Git installation from PATH first
1376 for git_dir in _find_git_in_win_path():
1377 yield os.path.join(git_dir, "etc", "gitconfig")
1378 return # Only use the first found path
1380 # Fall back to registry if not found in PATH
1381 for git_dir in _find_git_in_win_reg():
1382 yield os.path.join(git_dir, "etc", "gitconfig")
1383 return # Only use the first found path
1386def get_win_legacy_system_paths() -> Iterator[str]:
1387 """Get legacy Windows system Git config paths.
1389 Returns all possible config paths including deprecated locations.
1390 This function can be used for diagnostics or migration purposes.
1391 """
1392 # Include deprecated PROGRAMDATA location
1393 if "PROGRAMDATA" in os.environ:
1394 yield os.path.join(os.environ["PROGRAMDATA"], "Git", "config")
1396 # Include all Git installations found
1397 for git_dir in _find_git_in_win_path():
1398 yield os.path.join(git_dir, "etc", "gitconfig")
1399 for git_dir in _find_git_in_win_reg():
1400 yield os.path.join(git_dir, "etc", "gitconfig")
1403class StackedConfig(Config):
1404 """Configuration which reads from multiple config files.."""
1406 def __init__(
1407 self, backends: list[ConfigFile], writable: Optional[ConfigFile] = None
1408 ) -> None:
1409 """Initialize a StackedConfig.
1411 Args:
1412 backends: List of config files to read from (in order of precedence)
1413 writable: Optional config file to write changes to
1414 """
1415 self.backends = backends
1416 self.writable = writable
1418 def __repr__(self) -> str:
1419 """Return string representation of StackedConfig."""
1420 return f"<{self.__class__.__name__} for {self.backends!r}>"
1422 @classmethod
1423 def default(cls) -> "StackedConfig":
1424 """Create a StackedConfig with default system/user config files.
1426 Returns:
1427 StackedConfig with default configuration files loaded
1428 """
1429 return cls(cls.default_backends())
1431 @classmethod
1432 def default_backends(cls) -> list[ConfigFile]:
1433 """Retrieve the default configuration.
1435 See git-config(1) for details on the files searched.
1436 """
1437 paths = []
1439 # Handle GIT_CONFIG_GLOBAL - overrides user config paths
1440 try:
1441 paths.append(os.environ["GIT_CONFIG_GLOBAL"])
1442 except KeyError:
1443 paths.append(os.path.expanduser("~/.gitconfig"))
1444 paths.append(get_xdg_config_home_path("git", "config"))
1446 # Handle GIT_CONFIG_SYSTEM and GIT_CONFIG_NOSYSTEM
1447 try:
1448 paths.append(os.environ["GIT_CONFIG_SYSTEM"])
1449 except KeyError:
1450 if "GIT_CONFIG_NOSYSTEM" not in os.environ:
1451 paths.append("/etc/gitconfig")
1452 if sys.platform == "win32":
1453 paths.extend(get_win_system_paths())
1455 logger.debug("Loading gitconfig from paths: %s", paths)
1457 backends = []
1458 for path in paths:
1459 try:
1460 cf = ConfigFile.from_path(path)
1461 logger.debug("Successfully loaded gitconfig from: %s", path)
1462 except FileNotFoundError:
1463 logger.debug("Gitconfig file not found: %s", path)
1464 continue
1465 backends.append(cf)
1466 return backends
1468 def get(self, section: SectionLike, name: NameLike) -> Value:
1469 """Get value from configuration."""
1470 if not isinstance(section, tuple):
1471 section = (section,)
1472 for backend in self.backends:
1473 try:
1474 return backend.get(section, name)
1475 except KeyError:
1476 pass
1477 raise KeyError(name)
1479 def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
1480 """Get multiple values from configuration."""
1481 if not isinstance(section, tuple):
1482 section = (section,)
1483 for backend in self.backends:
1484 try:
1485 yield from backend.get_multivar(section, name)
1486 except KeyError:
1487 pass
1489 def set(
1490 self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
1491 ) -> None:
1492 """Set value in configuration."""
1493 if self.writable is None:
1494 raise NotImplementedError(self.set)
1495 return self.writable.set(section, name, value)
1497 def sections(self) -> Iterator[Section]:
1498 """Get all sections."""
1499 seen = set()
1500 for backend in self.backends:
1501 for section in backend.sections():
1502 if section not in seen:
1503 seen.add(section)
1504 yield section
1507def read_submodules(
1508 path: Union[str, os.PathLike[str]],
1509) -> Iterator[tuple[bytes, bytes, bytes]]:
1510 """Read a .gitmodules file."""
1511 cfg = ConfigFile.from_path(path)
1512 return parse_submodules(cfg)
1515def parse_submodules(config: ConfigFile) -> Iterator[tuple[bytes, bytes, bytes]]:
1516 """Parse a gitmodules GitConfig file, returning submodules.
1518 Args:
1519 config: A `ConfigFile`
1520 Returns:
1521 list of tuples (submodule path, url, name),
1522 where name is quoted part of the section's name.
1523 """
1524 for section in config.sections():
1525 section_kind, section_name = section
1526 if section_kind == b"submodule":
1527 try:
1528 sm_path = config.get(section, b"path")
1529 sm_url = config.get(section, b"url")
1530 yield (sm_path, sm_url, section_name)
1531 except KeyError:
1532 # If either path or url is missing, just ignore this
1533 # submodule entry and move on to the next one. This is
1534 # how git itself handles malformed .gitmodule entries.
1535 pass
1538def iter_instead_of(config: Config, push: bool = False) -> Iterable[tuple[str, str]]:
1539 """Iterate over insteadOf / pushInsteadOf values."""
1540 for section in config.sections():
1541 if section[0] != b"url":
1542 continue
1543 replacement = section[1]
1544 try:
1545 needles = list(config.get_multivar(section, "insteadOf"))
1546 except KeyError:
1547 needles = []
1548 if push:
1549 try:
1550 needles += list(config.get_multivar(section, "pushInsteadOf"))
1551 except KeyError:
1552 pass
1553 for needle in needles:
1554 assert isinstance(needle, bytes)
1555 yield needle.decode("utf-8"), replacement.decode("utf-8")
1558def apply_instead_of(config: Config, orig_url: str, push: bool = False) -> str:
1559 """Apply insteadOf / pushInsteadOf to a URL."""
1560 longest_needle = ""
1561 updated_url = orig_url
1562 for needle, replacement in iter_instead_of(config, push):
1563 if not orig_url.startswith(needle):
1564 continue
1565 if len(longest_needle) < len(needle):
1566 longest_needle = needle
1567 updated_url = replacement + orig_url[len(needle) :]
1568 return updated_url