Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/netaddr/ip/sets.py: 14%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -----------------------------------------------------------------------------
2# Copyright (c) 2008 by David P. D. Moss. All rights reserved.
3#
4# Released under the BSD license. See the LICENSE file for details.
5# -----------------------------------------------------------------------------
6"""Set based operations for IP addresses and subnets."""
8import itertools as _itertools
9import sys as _sys
11from netaddr.ip import IPNetwork, IPAddress, IPRange, cidr_merge, cidr_exclude, iprange_to_cidrs
14def _subtract(supernet, subnets, subnet_idx, ranges):
15 """Calculate IPSet([supernet]) - IPSet(subnets).
17 Assumptions: subnets is sorted, subnet_idx points to the first
18 element in subnets that is a subnet of supernet.
20 Results are appended to the ranges parameter as tuples of in format
21 (version, first, last). Return value is the first subnet_idx that
22 does not point to a subnet of supernet (or len(subnets) if all
23 subsequents items are a subnet of supernet).
24 """
25 version = supernet._module.version
26 subnet = subnets[subnet_idx]
27 if subnet.first > supernet.first:
28 ranges.append((version, supernet.first, subnet.first - 1))
30 subnet_idx += 1
31 prev_subnet = subnet
32 while subnet_idx < len(subnets):
33 cur_subnet = subnets[subnet_idx]
35 if cur_subnet not in supernet:
36 break
37 if prev_subnet.last + 1 == cur_subnet.first:
38 # two adjacent, non-mergable IPNetworks
39 pass
40 else:
41 ranges.append((version, prev_subnet.last + 1, cur_subnet.first - 1))
43 subnet_idx += 1
44 prev_subnet = cur_subnet
46 first = prev_subnet.last + 1
47 last = supernet.last
48 if first <= last:
49 ranges.append((version, first, last))
51 return subnet_idx
54def _iter_merged_ranges(sorted_ranges):
55 """Iterate over sorted_ranges, merging where possible
57 Sorted ranges must be a sorted iterable of (version, first, last) tuples.
58 Merging occurs for pairs like [(4, 10, 42), (4, 43, 100)] which is merged
59 into (4, 10, 100), and leads to return value
60 ( IPAddress(10, 4), IPAddress(100, 4) ), which is suitable input for the
61 iprange_to_cidrs function.
62 """
63 if not sorted_ranges:
64 return
66 current_version, current_start, current_stop = sorted_ranges[0]
68 for next_version, next_start, next_stop in sorted_ranges[1:]:
69 if next_start == current_stop + 1 and next_version == current_version:
70 # Can be merged.
71 current_stop = next_stop
72 continue
73 # Cannot be merged.
74 yield (IPAddress(current_start, current_version), IPAddress(current_stop, current_version))
75 current_start = next_start
76 current_stop = next_stop
77 current_version = next_version
78 yield (IPAddress(current_start, current_version), IPAddress(current_stop, current_version))
81class IPSet(object):
82 """
83 Represents an unordered collection (set) of unique IP addresses and
84 subnets.
86 """
88 __slots__ = ('_cidrs', '__weakref__')
90 def __init__(self, iterable=None, flags=0):
91 """
92 Constructor.
94 :param iterable: (optional) an iterable containing IP addresses,
95 subnets or ranges.
97 :param flags: decides which rules are applied to the interpretation
98 of the addr value. See the :class:`IPAddress` documentation
99 for supported constant values.
101 """
102 if isinstance(iterable, IPNetwork):
103 self._cidrs = {iterable.cidr: True}
104 elif isinstance(iterable, IPRange):
105 self._cidrs = dict.fromkeys(iprange_to_cidrs(iterable[0], iterable[-1]), True)
106 elif isinstance(iterable, IPSet):
107 self._cidrs = dict.fromkeys(iterable.iter_cidrs(), True)
108 else:
109 self._cidrs = {}
110 if iterable is not None:
111 mergeable = []
112 for addr in iterable:
113 if isinstance(addr, int):
114 addr = IPAddress(addr, flags=flags)
115 mergeable.append(addr)
117 for cidr in cidr_merge(mergeable):
118 self._cidrs[cidr] = True
120 def __getstate__(self):
121 """:return: Pickled state of an ``IPSet`` object."""
122 return tuple([cidr.__getstate__() for cidr in self._cidrs])
124 def __setstate__(self, state):
125 """
126 :param state: data used to unpickle a pickled ``IPSet`` object.
128 """
129 self._cidrs = dict.fromkeys(
130 (IPNetwork((value, prefixlen), version=version) for value, prefixlen, version in state),
131 True,
132 )
134 def _compact_single_network(self, added_network):
135 """
136 Same as compact(), but assume that added_network is the only change and
137 that this IPSet was properly compacted before added_network was added.
138 This allows to perform compaction much faster. added_network must
139 already be present in self._cidrs.
140 """
141 added_first = added_network.first
142 added_last = added_network.last
143 added_version = added_network.version
145 # Check for supernets and subnets of added_network.
146 if added_network._prefixlen == added_network._module.width:
147 # This is a single IP address, i.e. /32 for IPv4 or /128 for IPv6.
148 # It does not have any subnets, so we only need to check for its
149 # potential supernets.
150 for potential_supernet in added_network.supernet():
151 if potential_supernet in self._cidrs:
152 del self._cidrs[added_network]
153 return
154 else:
155 # IPNetworks from self._cidrs that are subnets of added_network.
156 to_remove = []
157 for cidr in self._cidrs:
158 if cidr._module.version != added_version or cidr == added_network:
159 # We found added_network or some network of a different version.
160 continue
161 first = cidr.first
162 last = cidr.last
163 if first >= added_first and last <= added_last:
164 # cidr is a subnet of added_network. Remember to remove it.
165 to_remove.append(cidr)
166 elif first <= added_first and last >= added_last:
167 # cidr is a supernet of added_network. Remove added_network.
168 del self._cidrs[added_network]
169 # This IPSet was properly compacted before. Since added_network
170 # is removed now, it must again be properly compacted -> done.
171 assert not to_remove
172 return
173 for item in to_remove:
174 del self._cidrs[item]
176 # Check if added_network can be merged with another network.
178 # Note that merging can only happen between networks of the same
179 # prefixlen. This just leaves 2 candidates: The IPNetworks just before
180 # and just after the added_network.
181 # This can be reduced to 1 candidate: 10.0.0.0/24 and 10.0.1.0/24 can
182 # be merged into into 10.0.0.0/23. But 10.0.1.0/24 and 10.0.2.0/24
183 # cannot be merged. With only 1 candidate, we might as well make a
184 # dictionary lookup.
185 shift_width = added_network._module.width - added_network.prefixlen
186 while added_network.prefixlen != 0:
187 # figure out if the least significant bit of the network part is 0 or 1.
188 the_bit = (added_network._value >> shift_width) & 1
189 if the_bit:
190 candidate = added_network.previous()
191 else:
192 candidate = added_network.next()
194 if candidate not in self._cidrs:
195 # The only possible merge does not work -> merge done
196 return
197 # Remove added_network&candidate, add merged network.
198 del self._cidrs[candidate]
199 del self._cidrs[added_network]
200 added_network.prefixlen -= 1
201 # Be sure that we set the host bits to 0 when we move the prefixlen.
202 # Otherwise, adding 255.255.255.255/32 will result in a merged
203 # 255.255.255.255/24 network, but we want 255.255.255.0/24.
204 shift_width += 1
205 added_network._value = (added_network._value >> shift_width) << shift_width
206 self._cidrs[added_network] = True
208 def compact(self):
209 """
210 Compact internal list of `IPNetwork` objects using a CIDR merge.
211 """
212 cidrs = cidr_merge(self._cidrs)
213 self._cidrs = dict.fromkeys(cidrs, True)
215 def __hash__(self):
216 """
217 Raises ``TypeError`` if this method is called.
219 .. note:: IPSet objects are not hashable and cannot be used as \
220 dictionary keys or as members of other sets. \
221 """
222 raise TypeError('IP sets are unhashable!')
224 def __contains__(self, ip):
225 """
226 :param ip: An IP address or subnet.
228 :return: ``True`` if IP address or subnet is a member of this IP set.
229 """
230 # Iterating over self._cidrs is an O(n) operation: 1000 items in
231 # self._cidrs would mean 1000 loops. Iterating over all possible
232 # supernets loops at most 32 times for IPv4 or 128 times for IPv6,
233 # no matter how many CIDRs this object contains.
234 supernet = IPNetwork(ip)
235 if supernet in self._cidrs:
236 return True
237 while supernet._prefixlen:
238 supernet._prefixlen -= 1
239 if supernet in self._cidrs:
240 return True
241 return False
243 def __bool__(self):
244 """Return True if IPSet contains at least one IP, else False"""
245 return bool(self._cidrs)
247 def __iter__(self):
248 """
249 :return: an iterator over the IP addresses within this IP set.
250 """
251 return _itertools.chain(*sorted(self._cidrs))
253 def iter_cidrs(self):
254 """
255 :return: an iterator over individual IP subnets within this IP set.
256 """
257 return sorted(self._cidrs)
259 def add(self, addr, flags=0):
260 """
261 Adds an IP address or subnet or IPRange to this IP set. Has no effect if
262 it is already present.
264 Note that where possible the IP address or subnet is merged with other
265 members of the set to form more concise CIDR blocks.
267 :param addr: An IP address or subnet in either string or object form, or
268 an IPRange object.
270 :param flags: decides which rules are applied to the interpretation
271 of the addr value. See the :class:`IPAddress` documentation
272 for supported constant values.
274 """
275 if isinstance(addr, IPRange):
276 new_cidrs = dict.fromkeys(iprange_to_cidrs(addr[0], addr[-1]), True)
277 self._cidrs.update(new_cidrs)
278 self.compact()
279 return
280 if isinstance(addr, IPNetwork):
281 # Networks like 10.1.2.3/8 need to be normalized to 10.0.0.0/8
282 addr = addr.cidr
283 elif isinstance(addr, int):
284 addr = IPNetwork(IPAddress(addr, flags=flags))
285 else:
286 addr = IPNetwork(addr)
288 self._cidrs[addr] = True
289 self._compact_single_network(addr)
291 def remove(self, addr, flags=0):
292 """
293 Removes an IP address or subnet or IPRange from this IP set. Does
294 nothing if it is not already a member.
296 Note that this method behaves more like discard() found in regular
297 Python sets because it doesn't raise KeyError exceptions if the
298 IP address or subnet is question does not exist. It doesn't make sense
299 to fully emulate that behaviour here as IP sets contain groups of
300 individual IP addresses as individual set members using IPNetwork
301 objects.
303 :param addr: An IP address or subnet, or an IPRange.
305 :param flags: decides which rules are applied to the interpretation
306 of the addr value. See the :class:`IPAddress` documentation
307 for supported constant values.
309 """
310 if isinstance(addr, IPRange):
311 cidrs = iprange_to_cidrs(addr[0], addr[-1])
312 for cidr in cidrs:
313 self.remove(cidr)
314 return
316 if isinstance(addr, int):
317 addr = IPAddress(addr, flags=flags)
318 else:
319 addr = IPNetwork(addr)
321 # This add() is required for address blocks provided that are larger
322 # than blocks found within the set but have overlaps. e.g. :-
323 #
324 # >>> IPSet(['192.0.2.0/24']).remove('192.0.2.0/23')
325 # IPSet([])
326 #
327 self.add(addr)
329 remainder = None
330 matching_cidr = None
332 # Search for a matching CIDR and exclude IP from it.
333 for cidr in self._cidrs:
334 if addr in cidr:
335 remainder = cidr_exclude(cidr, addr)
336 matching_cidr = cidr
337 break
339 # Replace matching CIDR with remaining CIDR elements.
340 if remainder is not None:
341 del self._cidrs[matching_cidr]
342 for cidr in remainder:
343 self._cidrs[cidr] = True
344 # No call to self.compact() is needed. Removing an IPNetwork cannot
345 # create mergeable networks.
347 def pop(self):
348 """
349 Removes and returns an arbitrary IP address or subnet from this IP
350 set.
352 :return: An IP address or subnet.
353 """
354 return self._cidrs.popitem()[0]
356 def isdisjoint(self, other):
357 """
358 :param other: an IP set.
360 :return: ``True`` if this IP set has no elements (IP addresses
361 or subnets) in common with other. Intersection *must* be an
362 empty set.
363 """
364 result = self.intersection(other)
365 return not result
367 def copy(self):
368 """:return: a shallow copy of this IP set."""
369 obj_copy = self.__class__()
370 obj_copy._cidrs.update(self._cidrs)
371 return obj_copy
373 def update(self, iterable, flags=0):
374 """
375 Update the contents of this IP set with the union of itself and
376 other IP set.
378 :param iterable: an iterable containing IP addresses, subnets or ranges.
380 :param flags: decides which rules are applied to the interpretation
381 of the addr value. See the :class:`IPAddress` documentation
382 for supported constant values.
384 """
385 if isinstance(iterable, IPSet):
386 self._cidrs = dict.fromkeys(
387 (
388 ip
389 for ip in cidr_merge(
390 _itertools.chain(self._cidrs.keys(), iterable._cidrs.keys())
391 )
392 ),
393 True,
394 )
395 return
396 elif isinstance(iterable, (IPNetwork, IPRange)):
397 self.add(iterable)
398 return
400 if not hasattr(iterable, '__iter__'):
401 raise TypeError('an iterable was expected!')
402 # An iterable containing IP addresses or subnets.
403 mergeable = []
404 for addr in iterable:
405 if isinstance(addr, int):
406 addr = IPAddress(addr, flags=flags)
407 mergeable.append(addr)
409 for cidr in cidr_merge(_itertools.chain(self._cidrs.keys(), mergeable)):
410 self._cidrs[cidr] = True
412 self.compact()
414 def clear(self):
415 """Remove all IP addresses and subnets from this IP set."""
416 self._cidrs = {}
418 def __eq__(self, other):
419 """
420 :param other: an IP set
422 :return: ``True`` if this IP set is equivalent to the ``other`` IP set,
423 ``False`` otherwise.
424 """
425 try:
426 return self._cidrs == other._cidrs
427 except AttributeError:
428 return NotImplemented
430 def __ne__(self, other):
431 """
432 :param other: an IP set
434 :return: ``False`` if this IP set is equivalent to the ``other`` IP set,
435 ``True`` otherwise.
436 """
437 try:
438 return self._cidrs != other._cidrs
439 except AttributeError:
440 return NotImplemented
442 def __lt__(self, other):
443 """
444 :param other: an IP set
446 :return: ``True`` if this IP set is less than the ``other`` IP set,
447 ``False`` otherwise.
448 """
449 if not hasattr(other, '_cidrs'):
450 return NotImplemented
452 return self.size < other.size and self.issubset(other)
454 def issubset(self, other):
455 """
456 :param other: an IP set.
458 :return: ``True`` if every IP address and subnet in this IP set
459 is found within ``other``.
460 """
461 for cidr in self._cidrs:
462 if cidr not in other:
463 return False
464 return True
466 __le__ = issubset
468 def __gt__(self, other):
469 """
470 :param other: an IP set.
472 :return: ``True`` if this IP set is greater than the ``other`` IP set,
473 ``False`` otherwise.
474 """
475 if not hasattr(other, '_cidrs'):
476 return NotImplemented
478 return self.size > other.size and self.issuperset(other)
480 def issuperset(self, other):
481 """
482 :param other: an IP set.
484 :return: ``True`` if every IP address and subnet in other IP set
485 is found within this one.
486 """
487 if not hasattr(other, '_cidrs'):
488 return NotImplemented
490 for cidr in other._cidrs:
491 if cidr not in self:
492 return False
493 return True
495 __ge__ = issuperset
497 def union(self, other):
498 """
499 :param other: an IP set.
501 :return: the union of this IP set and another as a new IP set
502 (combines IP addresses and subnets from both sets).
503 """
504 ip_set = self.copy()
505 ip_set.update(other)
506 return ip_set
508 __or__ = union
510 def intersection(self, other):
511 """
512 :param other: an IP set.
514 :return: the intersection of this IP set and another as a new IP set.
515 (IP addresses and subnets common to both sets).
516 """
517 result_cidrs = {}
519 own_nets = sorted(self._cidrs)
520 other_nets = sorted(other._cidrs)
521 own_idx = 0
522 other_idx = 0
523 own_len = len(own_nets)
524 other_len = len(other_nets)
525 while own_idx < own_len and other_idx < other_len:
526 own_cur = own_nets[own_idx]
527 other_cur = other_nets[other_idx]
529 if own_cur == other_cur:
530 result_cidrs[own_cur] = True
531 own_idx += 1
532 other_idx += 1
533 elif own_cur in other_cur:
534 result_cidrs[own_cur] = True
535 own_idx += 1
536 elif other_cur in own_cur:
537 result_cidrs[other_cur] = True
538 other_idx += 1
539 else:
540 # own_cur and other_cur have nothing in common
541 if own_cur < other_cur:
542 own_idx += 1
543 else:
544 other_idx += 1
546 # We ran out of networks in own_nets or other_nets. Either way, there
547 # can be no further result_cidrs.
548 result = IPSet()
549 result._cidrs = result_cidrs
550 return result
552 __and__ = intersection
554 def symmetric_difference(self, other):
555 """
556 :param other: an IP set.
558 :return: the symmetric difference of this IP set and another as a new
559 IP set (all IP addresses and subnets that are in exactly one
560 of the sets).
561 """
562 # In contrast to intersection() and difference(), we cannot construct
563 # the result_cidrs easily. Some cidrs may have to be merged, e.g. for
564 # IPSet(["10.0.0.0/32"]).symmetric_difference(IPSet(["10.0.0.1/32"])).
565 result_ranges = []
567 own_nets = sorted(self._cidrs)
568 other_nets = sorted(other._cidrs)
569 own_idx = 0
570 other_idx = 0
571 own_len = len(own_nets)
572 other_len = len(other_nets)
573 while own_idx < own_len and other_idx < other_len:
574 own_cur = own_nets[own_idx]
575 other_cur = other_nets[other_idx]
577 if own_cur == other_cur:
578 own_idx += 1
579 other_idx += 1
580 elif own_cur in other_cur:
581 own_idx = _subtract(other_cur, own_nets, own_idx, result_ranges)
582 other_idx += 1
583 elif other_cur in own_cur:
584 other_idx = _subtract(own_cur, other_nets, other_idx, result_ranges)
585 own_idx += 1
586 else:
587 # own_cur and other_cur have nothing in common
588 if own_cur < other_cur:
589 result_ranges.append((own_cur._module.version, own_cur.first, own_cur.last))
590 own_idx += 1
591 else:
592 result_ranges.append(
593 (other_cur._module.version, other_cur.first, other_cur.last)
594 )
595 other_idx += 1
597 # If the above loop terminated because it processed all cidrs of
598 # "other", then any remaining cidrs in self must be part of the result.
599 while own_idx < own_len:
600 own_cur = own_nets[own_idx]
601 result_ranges.append((own_cur._module.version, own_cur.first, own_cur.last))
602 own_idx += 1
604 # If the above loop terminated because it processed all cidrs of
605 # self, then any remaining cidrs in "other" must be part of the result.
606 while other_idx < other_len:
607 other_cur = other_nets[other_idx]
608 result_ranges.append((other_cur._module.version, other_cur.first, other_cur.last))
609 other_idx += 1
611 result = IPSet()
612 for start, stop in _iter_merged_ranges(result_ranges):
613 cidrs = iprange_to_cidrs(start, stop)
614 for cidr in cidrs:
615 result._cidrs[cidr] = True
616 return result
618 __xor__ = symmetric_difference
620 def difference(self, other):
621 """
622 :param other: an IP set.
624 :return: the difference between this IP set and another as a new IP
625 set (all IP addresses and subnets that are in this IP set but
626 not found in the other.)
627 """
628 result_ranges = []
629 result_cidrs = {}
631 own_nets = sorted(self._cidrs)
632 other_nets = sorted(other._cidrs)
633 own_idx = 0
634 other_idx = 0
635 own_len = len(own_nets)
636 other_len = len(other_nets)
637 while own_idx < own_len and other_idx < other_len:
638 own_cur = own_nets[own_idx]
639 other_cur = other_nets[other_idx]
641 if own_cur == other_cur:
642 own_idx += 1
643 other_idx += 1
644 elif own_cur in other_cur:
645 own_idx += 1
646 elif other_cur in own_cur:
647 other_idx = _subtract(own_cur, other_nets, other_idx, result_ranges)
648 own_idx += 1
649 else:
650 # own_cur and other_cur have nothing in common
651 if own_cur < other_cur:
652 result_cidrs[own_cur] = True
653 own_idx += 1
654 else:
655 other_idx += 1
657 # If the above loop terminated because it processed all cidrs of
658 # "other", then any remaining cidrs in self must be part of the result.
659 while own_idx < own_len:
660 result_cidrs[own_nets[own_idx]] = True
661 own_idx += 1
663 for start, stop in _iter_merged_ranges(result_ranges):
664 for cidr in iprange_to_cidrs(start, stop):
665 result_cidrs[cidr] = True
667 result = IPSet()
668 result._cidrs = result_cidrs
669 return result
671 __sub__ = difference
673 def __len__(self):
674 """
675 :return: the cardinality of this IP set (i.e. sum of individual IP \
676 addresses). Raises ``IndexError`` if size > maxsize (a Python \
677 limitation). Use the .size property for subnets of any size.
678 """
679 size = self.size
680 if size > _sys.maxsize:
681 raise IndexError(
682 'range contains more than %d (sys.maxsize) IP addresses!'
683 'Use the .size property instead.' % _sys.maxsize
684 )
685 return size
687 @property
688 def size(self):
689 """
690 The cardinality of this IP set (based on the number of individual IP
691 addresses including those implicitly defined in subnets).
692 """
693 return sum([cidr.size for cidr in self._cidrs])
695 def __repr__(self):
696 """:return: Python statement to create an equivalent object"""
697 return 'IPSet(%r)' % [str(c) for c in sorted(self._cidrs)]
699 __str__ = __repr__
701 def iscontiguous(self):
702 """
703 Returns True if the members of the set form a contiguous IP
704 address range (with no gaps), False otherwise.
706 :return: ``True`` if the ``IPSet`` object is contiguous.
707 """
708 cidrs = self.iter_cidrs()
709 if len(cidrs) > 1:
710 previous = cidrs[0][0]
711 for cidr in cidrs:
712 if cidr[0] != previous:
713 return False
714 previous = cidr[-1] + 1
715 return True
717 def iprange(self):
718 """
719 Generates an IPRange for this IPSet, if all its members
720 form a single contiguous sequence.
722 Raises ``ValueError`` if the set is not contiguous.
724 :return: An ``IPRange`` for all IPs in the IPSet.
725 """
726 if self.iscontiguous():
727 cidrs = self.iter_cidrs()
728 if not cidrs:
729 return None
730 return IPRange(cidrs[0][0], cidrs[-1][-1])
731 else:
732 raise ValueError('IPSet is not contiguous')
734 def iter_ipranges(self):
735 """Generate the merged IPRanges for this IPSet.
737 In contrast to self.iprange(), this will work even when the IPSet is
738 not contiguous. Adjacent IPRanges will be merged together, so you
739 get the minimal number of IPRanges.
740 """
741 sorted_ranges = [
742 (cidr._module.version, cidr.first, cidr.last) for cidr in self.iter_cidrs()
743 ]
745 for start, stop in _iter_merged_ranges(sorted_ranges):
746 yield IPRange(start, stop)