Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/rfc3986/parseresult.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

200 statements  

1# Copyright (c) 2015 Ian Stapleton Cordasco 

2# Licensed under the Apache License, Version 2.0 (the "License"); 

3# you may not use this file except in compliance with the License. 

4# You may obtain a copy of the License at 

5# 

6# http://www.apache.org/licenses/LICENSE-2.0 

7# 

8# Unless required by applicable law or agreed to in writing, software 

9# distributed under the License is distributed on an "AS IS" BASIS, 

10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

11# implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Module containing the urlparse compatibility logic.""" 

15import typing as t 

16from collections import namedtuple 

17 

18from . import compat 

19from . import exceptions 

20from . import misc 

21from . import normalizers 

22from . import uri 

23from ._typing_compat import Self as _Self 

24 

25__all__ = ("ParseResult", "ParseResultBytes") 

26 

27PARSED_COMPONENTS = ( 

28 "scheme", 

29 "userinfo", 

30 "host", 

31 "port", 

32 "path", 

33 "query", 

34 "fragment", 

35) 

36 

37 

38class ParseResultMixin(t.Generic[t.AnyStr]): 

39 if t.TYPE_CHECKING: 

40 userinfo: t.Optional[t.AnyStr] 

41 host: t.Optional[t.AnyStr] 

42 port: t.Optional[int] 

43 query: t.Optional[t.AnyStr] 

44 encoding: str 

45 

46 @property 

47 def authority(self) -> t.Optional[t.AnyStr]: ... 

48 

49 def _generate_authority( 

50 self, 

51 attributes: t.Dict[str, t.Optional[t.AnyStr]], 

52 ) -> t.Optional[str]: 

53 # I swear I did not align the comparisons below. That's just how they 

54 # happened to align based on pep8 and attribute lengths. 

55 userinfo, host, port = ( 

56 attributes[p] for p in ("userinfo", "host", "port") 

57 ) 

58 if self.userinfo != userinfo or self.host != host or self.port != port: 

59 if port: 

60 port = f"{port}" 

61 return normalizers.normalize_authority( 

62 ( 

63 compat.to_str(userinfo, self.encoding), 

64 compat.to_str(host, self.encoding), 

65 port, 

66 ) 

67 ) 

68 if isinstance(self.authority, bytes): 

69 return self.authority.decode("utf-8") 

70 return self.authority 

71 

72 def geturl(self) -> t.AnyStr: 

73 """Shim to match the standard library method.""" 

74 return self.unsplit() 

75 

76 @property 

77 def hostname(self) -> t.Optional[t.AnyStr]: 

78 """Shim to match the standard library.""" 

79 return self.host 

80 

81 @property 

82 def netloc(self) -> t.Optional[t.AnyStr]: 

83 """Shim to match the standard library.""" 

84 return self.authority 

85 

86 @property 

87 def params(self) -> t.Optional[t.AnyStr]: 

88 """Shim to match the standard library.""" 

89 return self.query 

90 

91 

92class ParseResult( 

93 namedtuple("ParseResult", PARSED_COMPONENTS), ParseResultMixin[str] 

94): 

95 """Implementation of urlparse compatibility class. 

96 

97 This uses the URIReference logic to handle compatibility with the 

98 urlparse.ParseResult class. 

99 """ 

100 

101 scheme: t.Optional[str] 

102 userinfo: t.Optional[str] 

103 host: t.Optional[str] 

104 port: t.Optional[int] 

105 path: t.Optional[str] 

106 query: t.Optional[str] 

107 fragment: t.Optional[str] 

108 encoding: str 

109 reference: "uri.URIReference" 

110 

111 def __new__( 

112 cls, 

113 scheme: t.Optional[str], 

114 userinfo: t.Optional[str], 

115 host: t.Optional[str], 

116 port: t.Optional[int], 

117 path: t.Optional[str], 

118 query: t.Optional[str], 

119 fragment: t.Optional[str], 

120 uri_ref: "uri.URIReference", 

121 encoding: str = "utf-8", 

122 ) -> _Self: 

123 """Create a new ParseResult.""" 

124 parse_result = super().__new__( 

125 cls, 

126 scheme or None, 

127 userinfo or None, 

128 host, 

129 port or None, 

130 path or None, 

131 query, 

132 fragment, 

133 ) 

134 parse_result.encoding = encoding 

135 parse_result.reference = uri_ref 

136 return parse_result 

137 

138 @classmethod 

139 def from_parts( 

140 cls, 

141 scheme: t.Optional[str] = None, 

142 userinfo: t.Optional[str] = None, 

143 host: t.Optional[str] = None, 

144 port: t.Optional[t.Union[int, str]] = None, 

145 path: t.Optional[str] = None, 

146 query: t.Optional[str] = None, 

147 fragment: t.Optional[str] = None, 

148 encoding: str = "utf-8", 

149 ) -> _Self: 

150 """Create a ParseResult instance from its parts.""" 

151 authority = "" 

152 if userinfo is not None: 

153 authority += userinfo + "@" 

154 if host is not None: 

155 authority += host 

156 if port is not None: 

157 authority += f":{port}" 

158 uri_ref = uri.URIReference( 

159 scheme=scheme, 

160 authority=authority, 

161 path=path, 

162 query=query, 

163 fragment=fragment, 

164 encoding=encoding, 

165 ).normalize() 

166 userinfo, host, port = authority_from(uri_ref, strict=True) 

167 return cls( 

168 scheme=uri_ref.scheme, 

169 userinfo=userinfo, 

170 host=host, 

171 port=port, 

172 path=uri_ref.path, 

173 query=uri_ref.query, 

174 fragment=uri_ref.fragment, 

175 uri_ref=uri_ref, 

176 encoding=encoding, 

177 ) 

178 

179 @classmethod 

180 def from_string( 

181 cls, 

182 uri_string: t.Union[str, bytes], 

183 encoding: str = "utf-8", 

184 strict: bool = True, 

185 lazy_normalize: bool = True, 

186 ) -> _Self: 

187 """Parse a URI from the given unicode URI string. 

188 

189 :param str uri_string: Unicode URI to be parsed into a reference. 

190 :param str encoding: The encoding of the string provided 

191 :param bool strict: Parse strictly according to :rfc:`3986` if True. 

192 If False, parse similarly to the standard library's urlparse 

193 function. 

194 :returns: :class:`ParseResult` or subclass thereof 

195 """ 

196 reference = uri.URIReference.from_string(uri_string, encoding) 

197 if not lazy_normalize: 

198 reference = reference.normalize() 

199 userinfo, host, port = authority_from(reference, strict) 

200 

201 return cls( 

202 scheme=reference.scheme, 

203 userinfo=userinfo, 

204 host=host, 

205 port=port, 

206 path=reference.path, 

207 query=reference.query, 

208 fragment=reference.fragment, 

209 uri_ref=reference, 

210 encoding=encoding, 

211 ) 

212 

213 @property 

214 def authority(self) -> t.Optional[str]: 

215 """Return the normalized authority.""" 

216 return self.reference.authority 

217 

218 def copy_with( 

219 self, 

220 scheme: t.Optional[str] = misc.UseExisting, 

221 userinfo: t.Optional[str] = misc.UseExisting, 

222 host: t.Optional[str] = misc.UseExisting, 

223 port: t.Optional[t.Union[int, str]] = misc.UseExisting, 

224 path: t.Optional[str] = misc.UseExisting, 

225 query: t.Optional[str] = misc.UseExisting, 

226 fragment: t.Optional[str] = misc.UseExisting, 

227 ) -> "ParseResult": 

228 """Create a copy of this instance replacing with specified parts.""" 

229 attributes = zip( 

230 PARSED_COMPONENTS, 

231 (scheme, userinfo, host, port, path, query, fragment), 

232 ) 

233 attrs_dict: t.Dict[str, t.Optional[str]] = {} 

234 for name, value in attributes: 

235 if value is misc.UseExisting: 

236 value = getattr(self, name) 

237 attrs_dict[name] = value 

238 authority = self._generate_authority(attrs_dict) 

239 ref = self.reference.copy_with( 

240 scheme=attrs_dict["scheme"], 

241 authority=authority, 

242 path=attrs_dict["path"], 

243 query=attrs_dict["query"], 

244 fragment=attrs_dict["fragment"], 

245 ) 

246 return ParseResult(uri_ref=ref, encoding=self.encoding, **attrs_dict) 

247 

248 def encode(self, encoding: t.Optional[str] = None) -> "ParseResultBytes": 

249 """Convert to an instance of ParseResultBytes.""" 

250 encoding = encoding or self.encoding 

251 attrs = dict( 

252 zip( 

253 PARSED_COMPONENTS, 

254 ( 

255 attr.encode(encoding) if hasattr(attr, "encode") else attr 

256 for attr in self 

257 ), 

258 ) 

259 ) 

260 return ParseResultBytes( 

261 uri_ref=self.reference, encoding=encoding, **attrs 

262 ) 

263 

264 def unsplit(self, use_idna: bool = False) -> str: 

265 """Create a URI string from the components. 

266 

267 :returns: The parsed URI reconstituted as a string. 

268 :rtype: str 

269 """ 

270 parse_result = self 

271 if use_idna and self.host: 

272 hostbytes = self.host.encode("idna") 

273 host = hostbytes.decode(self.encoding) 

274 parse_result = self.copy_with(host=host) 

275 return parse_result.reference.unsplit() 

276 

277 

278class ParseResultBytes( 

279 namedtuple("ParseResultBytes", PARSED_COMPONENTS), ParseResultMixin[bytes] 

280): 

281 """Compatibility shim for the urlparse.ParseResultBytes object.""" 

282 

283 scheme: t.Optional[bytes] 

284 userinfo: t.Optional[bytes] 

285 host: t.Optional[bytes] 

286 port: t.Optional[int] 

287 path: t.Optional[bytes] 

288 query: t.Optional[bytes] 

289 fragment: t.Optional[bytes] 

290 encoding: str 

291 reference: "uri.URIReference" 

292 lazy_normalize: bool 

293 

294 def __new__( 

295 cls, 

296 scheme: t.Optional[bytes], 

297 userinfo: t.Optional[bytes], 

298 host: t.Optional[bytes], 

299 port: t.Optional[int], 

300 path: t.Optional[bytes], 

301 query: t.Optional[bytes], 

302 fragment: t.Optional[bytes], 

303 uri_ref: "uri.URIReference", 

304 encoding: str = "utf-8", 

305 lazy_normalize: bool = True, 

306 ) -> _Self: 

307 """Create a new ParseResultBytes instance.""" 

308 parse_result = super().__new__( 

309 cls, 

310 scheme or None, 

311 userinfo or None, 

312 host, 

313 port or None, 

314 path or None, 

315 query or None, 

316 fragment or None, 

317 ) 

318 parse_result.encoding = encoding 

319 parse_result.reference = uri_ref 

320 parse_result.lazy_normalize = lazy_normalize 

321 return parse_result 

322 

323 @classmethod 

324 def from_parts( 

325 cls, 

326 scheme: t.Optional[str] = None, 

327 userinfo: t.Optional[str] = None, 

328 host: t.Optional[str] = None, 

329 port: t.Optional[t.Union[int, str]] = None, 

330 path: t.Optional[str] = None, 

331 query: t.Optional[str] = None, 

332 fragment: t.Optional[str] = None, 

333 encoding: str = "utf-8", 

334 lazy_normalize: bool = True, 

335 ) -> _Self: 

336 """Create a ParseResult instance from its parts.""" 

337 authority = "" 

338 if userinfo is not None: 

339 authority += userinfo + "@" 

340 if host is not None: 

341 authority += host 

342 if port is not None: 

343 authority += f":{int(port)}" 

344 uri_ref = uri.URIReference( 

345 scheme=scheme, 

346 authority=authority, 

347 path=path, 

348 query=query, 

349 fragment=fragment, 

350 encoding=encoding, 

351 ) 

352 if not lazy_normalize: 

353 uri_ref = uri_ref.normalize() 

354 to_bytes = compat.to_bytes 

355 userinfo, host, port = authority_from(uri_ref, strict=True) 

356 return cls( 

357 scheme=to_bytes(scheme, encoding), 

358 userinfo=to_bytes(userinfo, encoding), 

359 host=to_bytes(host, encoding), 

360 port=port, 

361 path=to_bytes(path, encoding), 

362 query=to_bytes(query, encoding), 

363 fragment=to_bytes(fragment, encoding), 

364 uri_ref=uri_ref, 

365 encoding=encoding, 

366 lazy_normalize=lazy_normalize, 

367 ) 

368 

369 @classmethod 

370 def from_string( 

371 cls, 

372 uri_string: t.Union[str, bytes], 

373 encoding: str = "utf-8", 

374 strict: bool = True, 

375 lazy_normalize: bool = True, 

376 ) -> _Self: 

377 """Parse a URI from the given unicode URI string. 

378 

379 :param str uri_string: Unicode URI to be parsed into a reference. 

380 :param str encoding: The encoding of the string provided 

381 :param bool strict: Parse strictly according to :rfc:`3986` if True. 

382 If False, parse similarly to the standard library's urlparse 

383 function. 

384 :returns: :class:`ParseResultBytes` or subclass thereof 

385 """ 

386 reference = uri.URIReference.from_string(uri_string, encoding) 

387 if not lazy_normalize: 

388 reference = reference.normalize() 

389 userinfo, host, port = authority_from(reference, strict) 

390 

391 to_bytes = compat.to_bytes 

392 return cls( 

393 scheme=to_bytes(reference.scheme, encoding), 

394 userinfo=to_bytes(userinfo, encoding), 

395 host=to_bytes(host, encoding), 

396 port=port, 

397 path=to_bytes(reference.path, encoding), 

398 query=to_bytes(reference.query, encoding), 

399 fragment=to_bytes(reference.fragment, encoding), 

400 uri_ref=reference, 

401 encoding=encoding, 

402 lazy_normalize=lazy_normalize, 

403 ) 

404 

405 @property 

406 def authority(self) -> bytes: 

407 """Return the normalized authority.""" 

408 return self.reference.authority.encode(self.encoding) 

409 

410 def copy_with( 

411 self, 

412 scheme: t.Optional[t.Union[str, bytes]] = misc.UseExisting, 

413 userinfo: t.Optional[t.Union[str, bytes]] = misc.UseExisting, 

414 host: t.Optional[t.Union[str, bytes]] = misc.UseExisting, 

415 port: t.Optional[t.Union[int, str, bytes]] = misc.UseExisting, 

416 path: t.Optional[t.Union[str, bytes]] = misc.UseExisting, 

417 query: t.Optional[t.Union[str, bytes]] = misc.UseExisting, 

418 fragment: t.Optional[t.Union[str, bytes]] = misc.UseExisting, 

419 lazy_normalize: bool = True, 

420 ) -> "ParseResultBytes": 

421 """Create a copy of this instance replacing with specified parts.""" 

422 attributes = zip( 

423 PARSED_COMPONENTS, 

424 (scheme, userinfo, host, port, path, query, fragment), 

425 ) 

426 attrs_dict = {} 

427 for name, value in attributes: 

428 if value is misc.UseExisting: 

429 value = getattr(self, name) 

430 if not isinstance(value, bytes) and hasattr(value, "encode"): 

431 value = value.encode(self.encoding) 

432 attrs_dict[name] = value 

433 

434 if t.TYPE_CHECKING: 

435 attrs_dict = t.cast(t.Dict[str, t.Optional[bytes]], attrs_dict) 

436 

437 authority = self._generate_authority(attrs_dict) 

438 to_str = compat.to_str 

439 ref = self.reference.copy_with( 

440 scheme=to_str(attrs_dict["scheme"], self.encoding), 

441 authority=to_str(authority, self.encoding), 

442 path=to_str(attrs_dict["path"], self.encoding), 

443 query=to_str(attrs_dict["query"], self.encoding), 

444 fragment=to_str(attrs_dict["fragment"], self.encoding), 

445 ) 

446 if not lazy_normalize: 

447 ref = ref.normalize() 

448 return ParseResultBytes( 

449 uri_ref=ref, 

450 encoding=self.encoding, 

451 lazy_normalize=lazy_normalize, 

452 **attrs_dict, 

453 ) 

454 

455 def unsplit(self, use_idna: bool = False) -> bytes: 

456 """Create a URI bytes object from the components. 

457 

458 :returns: The parsed URI reconstituted as a string. 

459 :rtype: bytes 

460 """ 

461 parse_result = self 

462 if use_idna and self.host: 

463 # self.host is bytes, to encode to idna, we need to decode it 

464 # first 

465 host = self.host.decode(self.encoding) 

466 hostbytes = host.encode("idna") 

467 parse_result = self.copy_with(host=hostbytes) 

468 if self.lazy_normalize: 

469 parse_result = parse_result.copy_with(lazy_normalize=False) 

470 uri = parse_result.reference.unsplit() 

471 return uri.encode(self.encoding) 

472 

473 

474def split_authority( 

475 authority: str, 

476) -> t.Tuple[t.Optional[str], t.Optional[str], t.Optional[str]]: 

477 # Initialize our expected return values 

478 userinfo = host = port = None 

479 # Initialize an extra var we may need to use 

480 extra_host = None 

481 # Set-up rest in case there is no userinfo portion 

482 rest = authority 

483 

484 if "@" in authority: 

485 userinfo, rest = authority.rsplit("@", 1) 

486 

487 # Handle IPv6 host addresses 

488 if rest.startswith("["): 

489 host, rest = rest.split("]", 1) 

490 host += "]" 

491 

492 if ":" in rest: 

493 extra_host, port = rest.split(":", 1) 

494 elif not host and rest: 

495 host = rest 

496 

497 if extra_host and not host: 

498 host = extra_host 

499 

500 return userinfo, host, port 

501 

502 

503def authority_from( 

504 reference: "uri.URIReference", 

505 strict: bool, 

506) -> t.Tuple[t.Optional[str], t.Optional[str], t.Optional[int]]: 

507 try: 

508 subauthority = reference.authority_info() 

509 except exceptions.InvalidAuthority: 

510 if strict: 

511 raise 

512 userinfo, host, port = split_authority(reference.authority) 

513 else: 

514 # Thanks to Richard Barrell for this idea: 

515 # https://twitter.com/0x2ba22e11/status/617338811975139328 

516 userinfo = subauthority.get("userinfo") 

517 host = subauthority.get("host") 

518 port = subauthority.get("port") 

519 

520 if port: 

521 if port.isascii() and port.isdigit(): 

522 port = int(port) 

523 else: 

524 raise exceptions.InvalidPort(port) 

525 return userinfo, host, port