Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/rekor/__init__.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

39 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16APIs for interacting with Rekor. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import typing 

23from abc import ABC, abstractmethod 

24 

25import rekor_types 

26import requests 

27from cryptography.x509 import Certificate 

28 

29from sigstore._utils import base64_encode_pem_cert 

30from sigstore.dsse import Envelope 

31from sigstore.hashes import Hashed 

32 

33if typing.TYPE_CHECKING: 

34 from sigstore.models import LogEntry 

35 

36__all__ = [ 

37 "_hashedrekord_from_parts", 

38] 

39 

40EntryRequestBody = typing.NewType("EntryRequestBody", dict[str, typing.Any]) 

41 

42 

43class RekorClientError(Exception): 

44 """ 

45 A generic error in the Rekor client. 

46 """ 

47 

48 def __init__(self, http_error: requests.HTTPError): 

49 """ 

50 Create a new `RekorClientError` from the given `requests.HTTPError`. 

51 """ 

52 if http_error.response is not None: 

53 try: 

54 error = rekor_types.Error.model_validate_json(http_error.response.text) 

55 super().__init__(f"{error.code}: {error.message}") 

56 except Exception: 

57 super().__init__( 

58 f"Rekor returned an unknown error with HTTP {http_error.response.status_code}" 

59 ) 

60 else: 

61 super().__init__(f"Unexpected Rekor error: {http_error}") 

62 

63 

64class RekorLogSubmitter(ABC): 

65 """ 

66 Abstract class to represent a Rekor log entry submitter. 

67 

68 Intended to be implemented by RekorClient and RekorV2Client. 

69 """ 

70 

71 @abstractmethod 

72 def create_entry( 

73 self, 

74 request: EntryRequestBody, 

75 ) -> LogEntry: 

76 """ 

77 Submit the request to Rekor. 

78 """ 

79 pass 

80 

81 @classmethod 

82 @abstractmethod 

83 def _build_hashed_rekord_request( 

84 self, hashed_input: Hashed, signature: bytes, certificate: Certificate 

85 ) -> EntryRequestBody: 

86 """ 

87 Construct a hashed rekord request to submit to Rekor. 

88 """ 

89 pass 

90 

91 @classmethod 

92 @abstractmethod 

93 def _build_dsse_request( 

94 self, envelope: Envelope, certificate: Certificate 

95 ) -> EntryRequestBody: 

96 """ 

97 Construct a dsse request to submit to Rekor. 

98 """ 

99 pass 

100 

101 

102# TODO: This should probably live somewhere better. 

103def _hashedrekord_from_parts( 

104 cert: Certificate, sig: bytes, hashed: Hashed 

105) -> rekor_types.Hashedrekord: 

106 return rekor_types.Hashedrekord( 

107 spec=rekor_types.hashedrekord.HashedrekordV001Schema( 

108 signature=rekor_types.hashedrekord.Signature( 

109 content=base64.b64encode(sig).decode(), 

110 public_key=rekor_types.hashedrekord.PublicKey( 

111 content=base64_encode_pem_cert(cert), 

112 ), 

113 ), 

114 data=rekor_types.hashedrekord.Data( 

115 hash=rekor_types.hashedrekord.Hash( 

116 algorithm=hashed._as_hashedrekord_algorithm(), 

117 value=hashed.digest.hex(), 

118 ) 

119 ), 

120 ) 

121 )