Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/uritools/__init__.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

395 statements  

1"""RFC 3986 compliant, scheme-agnostic replacement for `urllib.parse`. 

2 

3This module defines RFC 3986 compliant replacements for the most 

4commonly used functions of the Python Standard Library 

5:mod:`urllib.parse` module. 

6 

7""" 

8 

9import collections 

10import collections.abc 

11import ipaddress 

12import numbers 

13import re 

14from string import hexdigits 

15 

16__all__ = ( 

17 "GEN_DELIMS", 

18 "RESERVED", 

19 "SUB_DELIMS", 

20 "UNRESERVED", 

21 "isabspath", 

22 "isabsuri", 

23 "isnetpath", 

24 "isrelpath", 

25 "issamedoc", 

26 "isuri", 

27 "uricompose", 

28 "uridecode", 

29 "uridefrag", 

30 "uriencode", 

31 "urijoin", 

32 "urisplit", 

33 "uriunsplit", 

34) 

35 

36__version__ = "6.1.2" 

37 

38 

39# RFC 3986 2.2. Reserved Characters 

40# 

41# reserved = gen-delims / sub-delims 

42# 

43# gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@" 

44# 

45# sub-delims = "!" / "$" / "&" / "'" / "(" / ")" 

46# / "*" / "+" / "," / ";" / "=" 

47# 

48GEN_DELIMS = ":/?#[]@" 

49SUB_DELIMS = "!$&'()*+,;=" 

50RESERVED = GEN_DELIMS + SUB_DELIMS 

51 

52# RFC 3986 2.3. Unreserved Characters 

53# 

54# unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~" 

55# 

56UNRESERVED = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~" 

57 

58_unreserved = frozenset(UNRESERVED.encode()) 

59 

60# RFC 3986 2.1: For consistency, URI producers and normalizers should 

61# use uppercase hexadecimal digits for all percent-encodings. 

62_encoded = { 

63 b"": [ 

64 bytes([i]) if i in _unreserved else ("%%%02X" % i).encode() for i in range(256) 

65 ] 

66} 

67 

68_decoded = { 

69 (a + b).encode(): bytes.fromhex(a + b) for a in hexdigits for b in hexdigits 

70} 

71 

72 

73def uriencode(uristring, safe="", encoding="utf-8", errors="strict"): 

74 """Encode a URI string or string component.""" 

75 if not isinstance(uristring, bytes): 

76 uristring = uristring.encode(encoding, errors) 

77 if not isinstance(safe, bytes): 

78 safe = safe.encode("ascii") 

79 # FIXME: though unlikely, _encoded may grow without bounds if arbitrary 

80 # safe values are passed by a caller - consider using @functools.lru_cache 

81 # instead of plain dict? 

82 try: 

83 encoded = _encoded[safe] 

84 except KeyError: 

85 encoded = _encoded[b""][:] 

86 for i in safe: 

87 encoded[i] = bytes([i]) # type: ignore 

88 _encoded[safe] = encoded 

89 return b"".join(map(encoded.__getitem__, uristring)) 

90 

91 

92def uridecode(uristring, encoding="utf-8", errors="strict"): 

93 """Decode a URI string or string component.""" 

94 if not isinstance(uristring, bytes): 

95 uristring = uristring.encode(encoding or "ascii", errors) 

96 parts = uristring.split(b"%") 

97 result = [parts[0]] 

98 append = result.append 

99 decode = _decoded.get 

100 for s in parts[1:]: 

101 append(decode(s[:2], b"%" + s[:2])) 

102 append(s[2:]) 

103 if encoding is not None: 

104 return b"".join(result).decode(encoding, errors) 

105 else: 

106 return b"".join(result) 

107 

108 

109class DefragResult(collections.namedtuple("DefragResult", "uri fragment")): 

110 """Class to hold :func:`uridefrag` results.""" 

111 

112 __slots__ = () # prevent creation of instance dictionary 

113 

114 def geturi(self): 

115 """Return the recombined version of the original URI as a string.""" 

116 fragment = self.fragment 

117 if fragment is None: 

118 return self.uri 

119 elif isinstance(fragment, bytes): 

120 return self.uri + b"#" + fragment 

121 else: 

122 return self.uri + "#" + fragment 

123 

124 def getfragment(self, default=None, encoding="utf-8", errors="strict"): 

125 """Return the decoded fragment identifier, or `default` if the 

126 original URI did not contain a fragment component. 

127 

128 """ 

129 # FIXME: by default, getfragment() should return bytes if geturi() returns bytes 

130 fragment = self.fragment 

131 if fragment is not None: 

132 return uridecode(fragment, encoding, errors) 

133 else: 

134 return default 

135 

136 

137class SplitResult( 

138 collections.namedtuple("SplitResult", "scheme authority path query fragment") 

139): 

140 """Base class to hold :func:`urisplit` results.""" 

141 

142 __slots__ = () # prevent creation of instance dictionary 

143 

144 @property 

145 def userinfo(self): 

146 authority = self.authority 

147 if authority is None: 

148 return None 

149 userinfo, present, _ = authority.rpartition(self._AT) 

150 if present: 

151 return userinfo 

152 else: 

153 return None 

154 

155 @property 

156 def host(self): 

157 authority = self.authority 

158 if authority is None: 

159 return None 

160 _, _, hostinfo = authority.rpartition(self._AT) 

161 host, _, port = hostinfo.rpartition(self._COLON) 

162 if port.lstrip(self._DIGITS): 

163 return hostinfo 

164 else: 

165 return host 

166 

167 @property 

168 def port(self): 

169 authority = self.authority 

170 if authority is None: 

171 return None 

172 _, present, port = authority.rpartition(self._COLON) 

173 if present and not port.lstrip(self._DIGITS): 

174 return port 

175 else: 

176 return None 

177 

178 def geturi(self): 

179 """Return the re-combined version of the original URI reference as a 

180 string. 

181 

182 """ 

183 scheme, authority, path, query, fragment = self 

184 

185 # RFC 3986 5.3. Component Recomposition 

186 result = [] 

187 if scheme is not None: 

188 result.extend([scheme, self._COLON]) 

189 if authority is not None: 

190 result.extend([self._SLASH, self._SLASH, authority]) 

191 result.append(path) 

192 if query is not None: 

193 result.extend([self._QUEST, query]) 

194 if fragment is not None: 

195 result.extend([self._HASH, fragment]) 

196 return self._EMPTY.join(result) 

197 

198 def getscheme(self, default=None): 

199 """Return the URI scheme in canonical (lowercase) form, or `default` 

200 if the original URI reference did not contain a scheme component. 

201 

202 """ 

203 # FIXME: should getscheme() return bytes if geturi() returns bytes? 

204 scheme = self.scheme 

205 if scheme is None: 

206 return default 

207 elif isinstance(scheme, bytes): 

208 return scheme.decode("ascii").lower() 

209 else: 

210 return scheme.lower() 

211 

212 def getauthority(self, default=None, encoding="utf-8", errors="strict"): 

213 """Return the decoded userinfo, host and port subcomponents of the URI 

214 authority as a three-item tuple. 

215 

216 """ 

217 # TBD: (userinfo, host, port) kwargs, default string? 

218 if default is None: 

219 default = (None, None, None) 

220 elif not isinstance(default, collections.abc.Sequence): 

221 raise TypeError("Invalid default type") 

222 elif len(default) != 3: 

223 raise ValueError("Invalid default length") 

224 # TODO: this could be much more efficient by using a dedicated regex 

225 return ( 

226 self.getuserinfo(default[0], encoding, errors), 

227 self.gethost(default[1], errors), 

228 self.getport(default[2]), 

229 ) 

230 

231 def getuserinfo(self, default=None, encoding="utf-8", errors="strict"): 

232 """Return the decoded userinfo subcomponent of the URI authority, or 

233 `default` if the original URI reference did not contain a 

234 userinfo field. 

235 

236 """ 

237 userinfo = self.userinfo 

238 if userinfo is None: 

239 return default 

240 else: 

241 return uridecode(userinfo, encoding, errors) 

242 

243 def gethost(self, default=None, errors="strict"): 

244 """Return the decoded host subcomponent of the URI authority as a 

245 string or an :mod:`ipaddress` address object, or `default` if 

246 the original URI reference did not contain a host. 

247 

248 """ 

249 host = self.host 

250 if host is None or (not host and default is not None): 

251 return default 

252 elif host.startswith(self._LBRACKET) and host.endswith(self._RBRACKET): 

253 return self.__parse_ip_literal(host[1:-1]) 

254 elif host.startswith(self._LBRACKET) or host.endswith(self._RBRACKET): 

255 raise ValueError("Invalid host %r: mismatched brackets" % host) 

256 # TODO: faster check for IPv4 address? 

257 try: 

258 if isinstance(host, bytes): 

259 return ipaddress.IPv4Address(host.decode("ascii")) 

260 else: 

261 return ipaddress.IPv4Address(host) 

262 except ValueError: 

263 return uridecode(host, "utf-8", errors).lower() 

264 

265 def getport(self, default=None): 

266 """Return the port subcomponent of the URI authority as an 

267 :class:`int`, or `default` if the original URI reference did 

268 not contain a port or if the port was empty. 

269 

270 """ 

271 port = self.port 

272 if port: 

273 return int(port) 

274 else: 

275 return default 

276 

277 def getpath(self, encoding="utf-8", errors="strict"): 

278 """Return the normalized decoded URI path.""" 

279 path = self.__remove_dot_segments(self.path) 

280 return uridecode(path, encoding, errors) 

281 

282 def getquery(self, default=None, encoding="utf-8", errors="strict"): 

283 """Return the decoded query string, or `default` if the original URI 

284 reference did not contain a query component. 

285 

286 """ 

287 query = self.query 

288 if query is None: 

289 return default 

290 else: 

291 return uridecode(query, encoding, errors) 

292 

293 def getquerydict(self, sep="&", encoding="utf-8", errors="strict"): 

294 """Split the query component into individual `name=value` pairs 

295 separated by `sep` and return a dictionary of query variables. 

296 The dictionary keys are the unique query variable names and 

297 the values are lists of values for each name. 

298 

299 """ 

300 result = collections.defaultdict(list) 

301 for name, value in self.getquerylist(sep, encoding, errors): 

302 result[name].append(value) 

303 return result 

304 

305 def getquerylist(self, sep="&", encoding="utf-8", errors="strict"): 

306 """Split the query component into individual `name=value` pairs 

307 separated by `sep`, and return a list of `(name, value)` 

308 tuples. 

309 

310 """ 

311 if not self.query: 

312 return [] 

313 elif isinstance(sep, type(self.query)): 

314 qsl = self.query.split(sep) 

315 elif isinstance(sep, bytes): 

316 qsl = self.query.split(sep.decode("ascii")) 

317 else: 

318 qsl = self.query.split(sep.encode("ascii")) 

319 result = [] 

320 for parts in [qs.partition(self._EQ) for qs in qsl if qs]: 

321 name = uridecode(parts[0], encoding, errors) 

322 if parts[1]: 

323 value = uridecode(parts[2], encoding, errors) 

324 else: 

325 value = None 

326 result.append((name, value)) 

327 return result 

328 

329 def getfragment(self, default=None, encoding="utf-8", errors="strict"): 

330 """Return the decoded fragment identifier, or `default` if the 

331 original URI reference did not contain a fragment component. 

332 

333 """ 

334 fragment = self.fragment 

335 if fragment is None: 

336 return default 

337 else: 

338 return uridecode(fragment, encoding, errors) 

339 

340 def isuri(self): 

341 """Return :const:`True` if this is a URI.""" 

342 return self.scheme is not None 

343 

344 def isabsuri(self): 

345 """Return :const:`True` if this is an absolute URI.""" 

346 return self.scheme is not None and self.fragment is None 

347 

348 def isnetpath(self): 

349 """Return :const:`True` if this is a network-path reference.""" 

350 return self.scheme is None and self.authority is not None 

351 

352 def isabspath(self): 

353 """Return :const:`True` if this is an absolute-path reference.""" 

354 return ( 

355 self.scheme is None 

356 and self.authority is None 

357 and self.path.startswith(self._SLASH) 

358 ) 

359 

360 def isrelpath(self): 

361 """Return :const:`True` if this is a relative-path reference.""" 

362 return ( 

363 self.scheme is None 

364 and self.authority is None 

365 and not self.path.startswith(self._SLASH) 

366 ) 

367 

368 def issamedoc(self): 

369 """Return :const:`True` if this is a same-document reference.""" 

370 return ( 

371 self.scheme is None 

372 and self.authority is None 

373 and not self.path 

374 and self.query is None 

375 ) 

376 

377 def transform(self, ref, strict=False): 

378 """Transform a URI reference relative to `self` into a 

379 :class:`SplitResult` representing its target URI. 

380 

381 """ 

382 scheme, authority, path, query, fragment = self._match(ref).groups() 

383 

384 # RFC 3986 5.2.2. Transform References 

385 if scheme is not None and (strict or scheme != self.scheme): 

386 path = self.__remove_dot_segments(path) 

387 elif authority is not None: 

388 scheme = self.scheme 

389 path = self.__remove_dot_segments(path) 

390 elif not path: 

391 scheme = self.scheme 

392 authority = self.authority 

393 path = self.path 

394 query = self.query if query is None else query 

395 elif path.startswith(self._SLASH): 

396 scheme = self.scheme 

397 authority = self.authority 

398 path = self.__remove_dot_segments(path) 

399 else: 

400 scheme = self.scheme 

401 authority = self.authority 

402 path = self.__remove_dot_segments(self.__merge(path)) 

403 return type(self)(scheme, authority, path, query, fragment) 

404 

405 def __merge(self, path): 

406 # RFC 3986 5.2.3. Merge Paths 

407 if self.authority is not None and not self.path: 

408 return self._SLASH + path 

409 else: 

410 parts = self.path.rpartition(self._SLASH) 

411 return parts[1].join((parts[0], path)) 

412 

413 @classmethod 

414 def __remove_dot_segments(cls, path): 

415 # RFC 3986 5.2.4. Remove Dot Segments 

416 pseg = [] 

417 for s in path.split(cls._SLASH): 

418 if s == cls._DOT: 

419 continue 

420 elif s != cls._DOTDOT: 

421 pseg.append(s) 

422 elif len(pseg) == 1 and not pseg[0]: 

423 continue 

424 elif pseg and pseg[-1] != cls._DOTDOT: 

425 pseg.pop() 

426 else: 

427 pseg.append(s) 

428 # adjust for trailing '/.' or '/..' 

429 if path.rpartition(cls._SLASH)[2] in (cls._DOT, cls._DOTDOT): 

430 pseg.append(cls._EMPTY) 

431 if path and len(pseg) == 1 and pseg[0] == cls._EMPTY: 

432 pseg.insert(0, cls._DOT) 

433 return cls._SLASH.join(pseg) 

434 

435 @classmethod 

436 def __parse_ip_literal(cls, address): 

437 # RFC 3986 3.2.2: In anticipation of future, as-yet-undefined 

438 # IP literal address formats, an implementation may use an 

439 # optional version flag to indicate such a format explicitly 

440 # rather than rely on heuristic determination. 

441 # 

442 # IP-literal = "[" ( IPv6address / IPvFuture ) "]" 

443 # 

444 # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" ) 

445 # 

446 # If a URI containing an IP-literal that starts with "v" 

447 # (case-insensitive), indicating that the version flag is 

448 # present, is dereferenced by an application that does not 

449 # know the meaning of that version flag, then the application 

450 # should return an appropriate error for "address mechanism 

451 # not supported". 

452 if isinstance(address, bytes): 

453 address = address.decode("ascii") 

454 if address.startswith("v"): 

455 raise ValueError("address mechanism not supported") 

456 return ipaddress.IPv6Address(address) 

457 

458 

459# TODO: make private? 

460class SplitResultBytes(SplitResult): 

461 __slots__ = () # prevent creation of instance dictionary 

462 

463 # RFC 3986 Appendix B 

464 _RE = re.compile( 

465 rb""" 

466 (?:([A-Za-z][A-Za-z0-9+.-]*):)? # scheme (RFC 3986 3.1) 

467 (?://([^/?#]*))? # authority 

468 ([^?#]*) # path 

469 (?:\?([^#]*))? # query 

470 (?:\#(.*))? # fragment 

471 """, 

472 flags=re.VERBOSE, 

473 ) 

474 

475 @classmethod 

476 def _match(cls, ref): 

477 return cls._RE.match(ref) 

478 

479 # RFC 3986 2.2 gen-delims 

480 _COLON, _SLASH, _QUEST, _HASH, _LBRACKET, _RBRACKET, _AT = ( 

481 b":", 

482 b"/", 

483 b"?", 

484 b"#", 

485 b"[", 

486 b"]", 

487 b"@", 

488 ) 

489 

490 # RFC 3986 3.3 dot-segments 

491 _DOT, _DOTDOT = b".", b".." 

492 

493 _EMPTY, _EQ = b"", b"=" 

494 

495 _DIGITS = b"0123456789" 

496 

497 

498# TODO: make private? 

499class SplitResultString(SplitResult): 

500 __slots__ = () # prevent creation of instance dictionary 

501 

502 # RFC 3986 Appendix B 

503 _RE = re.compile( 

504 r""" 

505 (?:([A-Za-z][A-Za-z0-9+.-]*):)? # scheme (RFC 3986 3.1) 

506 (?://([^/?#]*))? # authority 

507 ([^?#]*) # path 

508 (?:\?([^#]*))? # query 

509 (?:\#(.*))? # fragment 

510 """, 

511 flags=re.VERBOSE, 

512 ) 

513 

514 @classmethod 

515 def _match(cls, ref): 

516 return cls._RE.match(ref) 

517 

518 # RFC 3986 2.2 gen-delims 

519 _COLON, _SLASH, _QUEST, _HASH, _LBRACKET, _RBRACKET, _AT = ( 

520 ":", 

521 "/", 

522 "?", 

523 "#", 

524 "[", 

525 "]", 

526 "@", 

527 ) 

528 

529 # RFC 3986 3.3 dot-segments 

530 _DOT, _DOTDOT = ".", ".." 

531 

532 _EMPTY, _EQ = "", "=" 

533 

534 _DIGITS = "0123456789" 

535 

536 

537def uridefrag(uristring): 

538 """Remove an existing fragment component from a URI reference string.""" 

539 if isinstance(uristring, bytes): 

540 parts = uristring.partition(b"#") 

541 else: 

542 parts = uristring.partition("#") 

543 return DefragResult(parts[0], parts[2] if parts[1] else None) 

544 

545 

546def urisplit(uristring): 

547 """Split a well-formed URI reference string into a tuple with five 

548 components corresponding to a URI's general structure:: 

549 

550 <scheme>://<authority>/<path>?<query>#<fragment> 

551 

552 """ 

553 if isinstance(uristring, bytes): 

554 result = SplitResultBytes 

555 else: 

556 result = SplitResultString 

557 return result(*result._match(uristring).groups()) 

558 

559 

560def uriunsplit(parts): 

561 """Combine the elements of a five-item iterable into a URI reference's 

562 string representation. 

563 

564 """ 

565 scheme, authority, path, query, fragment = parts 

566 if isinstance(path, bytes): 

567 result = SplitResultBytes 

568 else: 

569 result = SplitResultString 

570 return result(scheme, authority, path, query, fragment).geturi() 

571 

572 

573def urijoin(base, ref, strict=False): 

574 """Convert a URI reference relative to a base URI to its target URI 

575 string. 

576 

577 """ 

578 if isinstance(base, type(ref)): 

579 return urisplit(base).transform(ref, strict).geturi() 

580 elif isinstance(base, bytes): 

581 return urisplit(base.decode()).transform(ref, strict).geturi() 

582 else: 

583 return urisplit(base).transform(ref.decode(), strict).geturi() 

584 

585 

586def isuri(uristring): 

587 """Return :const:`True` if `uristring` is a URI.""" 

588 return urisplit(uristring).isuri() 

589 

590 

591def isabsuri(uristring): 

592 """Return :const:`True` if `uristring` is an absolute URI.""" 

593 return urisplit(uristring).isabsuri() 

594 

595 

596def isnetpath(uristring): 

597 """Return :const:`True` if `uristring` is a network-path reference.""" 

598 return urisplit(uristring).isnetpath() 

599 

600 

601def isabspath(uristring): 

602 """Return :const:`True` if `uristring` is an absolute-path reference.""" 

603 return urisplit(uristring).isabspath() 

604 

605 

606def isrelpath(uristring): 

607 """Return :const:`True` if `uristring` is a relative-path reference.""" 

608 return urisplit(uristring).isrelpath() 

609 

610 

611def issamedoc(uristring): 

612 """Return :const:`True` if `uristring` is a same-document reference.""" 

613 return urisplit(uristring).issamedoc() 

614 

615 

616# RFC 3986 3.1: scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." ) 

617_SCHEME_RE = re.compile(b"^[A-Za-z][A-Za-z0-9+.-]*$") 

618 

619# RFC 3986 3.2: authority = [ userinfo "@" ] host [ ":" port ] 

620_AUTHORITY_RE_BYTES = re.compile(b"^(?:(.*)@)?(.*?)(?::([0-9]*))?$") 

621_AUTHORITY_RE_STR = re.compile("^(?:(.*)@)?(.*?)(?::([0-9]*))?$") 

622 

623# safe component characters 

624_SAFE_USERINFO = SUB_DELIMS + ":" 

625_SAFE_HOST = SUB_DELIMS 

626_SAFE_PATH = SUB_DELIMS + ":@/" 

627_SAFE_QUERY = SUB_DELIMS + ":@/?" 

628_SAFE_FRAGMENT = SUB_DELIMS + ":@/?" 

629 

630 

631def _scheme(scheme): 

632 if _SCHEME_RE.match(scheme): 

633 return scheme.lower() 

634 else: 

635 raise ValueError("Invalid scheme component") 

636 

637 

638def _authority(userinfo, host, port, encoding): 

639 authority = [] 

640 

641 if userinfo is not None: 

642 authority.append(uriencode(userinfo, _SAFE_USERINFO, encoding)) 

643 authority.append(b"@") 

644 

645 if isinstance(host, ipaddress.IPv6Address): 

646 authority.append(b"[" + host.compressed.encode() + b"]") 

647 elif isinstance(host, ipaddress.IPv4Address): 

648 authority.append(host.compressed.encode()) 

649 elif isinstance(host, bytes): 

650 authority.append(_host(host)) 

651 elif isinstance(host, str): 

652 authority.append(_host(host.encode("utf-8"))) 

653 elif host is not None: 

654 raise TypeError("Invalid host type: %r" % type(host)) 

655 

656 if isinstance(port, numbers.Number): 

657 authority.append(_port(str(port).encode())) 

658 elif isinstance(port, bytes): 

659 authority.append(_port(port)) 

660 elif port is not None: 

661 authority.append(_port(port.encode())) 

662 

663 return b"".join(authority) if authority else None 

664 

665 

666def _ip_literal(address): 

667 if address.startswith("v"): 

668 raise ValueError("Address mechanism not supported") 

669 else: 

670 return b"[" + ipaddress.IPv6Address(address).compressed.encode() + b"]" 

671 

672 

673def _host(host): 

674 # RFC 3986 3.2.3: Although host is case-insensitive, producers and 

675 # normalizers should use lowercase for registered names and 

676 # hexadecimal addresses for the sake of uniformity, while only 

677 # using uppercase letters for percent-encodings. 

678 if host.startswith(b"[") and host.endswith(b"]"): 

679 return _ip_literal(host[1:-1].decode()) 

680 # check for IPv6 addresses as returned by SplitResult.gethost() 

681 try: 

682 return _ip_literal(host.decode("utf-8")) 

683 except ValueError: 

684 return uriencode(host.lower(), _SAFE_HOST, "utf-8") 

685 

686 

687def _port(port): 

688 # RFC 3986 3.2.3: URI producers and normalizers should omit the 

689 # port component and its ":" delimiter if port is empty or if its 

690 # value would be the same as that of the scheme's default. 

691 if port.lstrip(b"0123456789"): 

692 raise ValueError("Invalid port subcomponent") 

693 elif port: 

694 return b":" + port 

695 else: 

696 return b"" 

697 

698 

699def _querylist(items, sep, encoding): 

700 terms = [] 

701 append = terms.append 

702 safe = _SAFE_QUERY.replace(sep, "") 

703 for key, value in items: 

704 name = uriencode(key, safe, encoding) 

705 if value is None: 

706 append(name) 

707 elif isinstance(value, (bytes, str)): 

708 append(name + b"=" + uriencode(value, safe, encoding)) 

709 else: 

710 append(name + b"=" + uriencode(str(value), safe, encoding)) 

711 return sep.encode("ascii").join(terms) 

712 

713 

714def _querydict(mapping, sep, encoding): 

715 items = [] 

716 for key, value in mapping.items(): 

717 if isinstance(value, (bytes, str)): 

718 items.append((key, value)) 

719 elif isinstance(value, collections.abc.Iterable): 

720 items.extend([(key, v) for v in value]) 

721 else: 

722 items.append((key, value)) 

723 return _querylist(items, sep, encoding) 

724 

725 

726def uricompose( 

727 scheme=None, 

728 authority=None, 

729 path="", 

730 query=None, 

731 fragment=None, 

732 userinfo=None, 

733 host=None, 

734 port=None, 

735 querysep="&", 

736 encoding="utf-8", 

737): 

738 """Compose a URI reference string from its individual components.""" 

739 

740 # RFC 3986 3.1: Scheme names consist of a sequence of characters 

741 # beginning with a letter and followed by any combination of 

742 # letters, digits, plus ("+"), period ("."), or hyphen ("-"). 

743 # Although schemes are case-insensitive, the canonical form is 

744 # lowercase and documents that specify schemes must do so with 

745 # lowercase letters. An implementation should accept uppercase 

746 # letters as equivalent to lowercase in scheme names (e.g., allow 

747 # "HTTP" as well as "http") for the sake of robustness but should 

748 # only produce lowercase scheme names for consistency. 

749 if isinstance(scheme, bytes): 

750 scheme = _scheme(scheme) 

751 elif scheme is not None: 

752 scheme = _scheme(scheme.encode()) 

753 

754 # authority must be string type or three-item sequence 

755 if authority is None: 

756 authority = (None, None, None) 

757 elif isinstance(authority, bytes): 

758 authority = _AUTHORITY_RE_BYTES.match(authority).groups() 

759 elif isinstance(authority, str): 

760 authority = _AUTHORITY_RE_STR.match(authority).groups() 

761 elif not isinstance(authority, collections.abc.Sequence): 

762 raise TypeError("Invalid authority type") 

763 elif len(authority) != 3: 

764 raise ValueError("Invalid authority length") 

765 authority = _authority( 

766 userinfo if userinfo is not None else authority[0], 

767 host if host is not None else authority[1], 

768 port if port is not None else authority[2], 

769 encoding, 

770 ) 

771 

772 # RFC 3986 3.3: If a URI contains an authority component, then the 

773 # path component must either be empty or begin with a slash ("/") 

774 # character. If a URI does not contain an authority component, 

775 # then the path cannot begin with two slash characters ("//"). 

776 path = uriencode(path, _SAFE_PATH, encoding) 

777 if authority is not None and path and not path.startswith(b"/"): 

778 raise ValueError("Invalid path with authority component") 

779 if authority is None and path.startswith(b"//"): 

780 raise ValueError("Invalid path without authority component") 

781 

782 # RFC 3986 4.2: A path segment that contains a colon character 

783 # (e.g., "this:that") cannot be used as the first segment of a 

784 # relative-path reference, as it would be mistaken for a scheme 

785 # name. Such a segment must be preceded by a dot-segment (e.g., 

786 # "./this:that") to make a relative-path reference. 

787 if scheme is None and authority is None and not path.startswith(b"/"): 

788 if b":" in path.partition(b"/")[0]: 

789 path = b"./" + path 

790 

791 # RFC 3986 3.4: The characters slash ("/") and question mark ("?") 

792 # may represent data within the query component. Beware that some 

793 # older, erroneous implementations may not handle such data 

794 # correctly when it is used as the base URI for relative 

795 # references (Section 5.1), apparently because they fail to 

796 # distinguish query data from path data when looking for 

797 # hierarchical separators. However, as query components are often 

798 # used to carry identifying information in the form of "key=value" 

799 # pairs and one frequently used value is a reference to another 

800 # URI, it is sometimes better for usability to avoid percent- 

801 # encoding those characters. 

802 if isinstance(query, (bytes, str)): 

803 query = uriencode(query, _SAFE_QUERY, encoding) 

804 elif isinstance(query, collections.abc.Mapping): 

805 query = _querydict(query, querysep, encoding) 

806 elif isinstance(query, collections.abc.Iterable): 

807 query = _querylist(query, querysep, encoding) 

808 elif query is not None: 

809 raise TypeError("Invalid query type") 

810 

811 # RFC 3986 3.5: The characters slash ("/") and question mark ("?") 

812 # are allowed to represent data within the fragment identifier. 

813 # Beware that some older, erroneous implementations may not handle 

814 # this data correctly when it is used as the base URI for relative 

815 # references. 

816 if fragment is not None: 

817 fragment = uriencode(fragment, _SAFE_FRAGMENT, encoding) 

818 

819 # return URI reference as `str` 

820 return uriunsplit((scheme, authority, path, query, fragment)).decode()