1# Copyright 2022 The Sigstore Authors 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#      http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15""" 
    16Client implementation for interacting with Fulcio. 
    17""" 
    18 
    19from __future__ import annotations 
    20 
    21import base64 
    22import json 
    23import logging 
    24from abc import ABC 
    25from dataclasses import dataclass 
    26from urllib.parse import urljoin 
    27 
    28import requests 
    29from cryptography.hazmat.primitives import serialization 
    30from cryptography.x509 import ( 
    31    Certificate, 
    32    CertificateSigningRequest, 
    33    load_pem_x509_certificate, 
    34) 
    35 
    36from sigstore._internal import USER_AGENT 
    37from sigstore._utils import B64Str 
    38from sigstore.oidc import IdentityToken 
    39 
    40_logger = logging.getLogger(__name__) 
    41 
    42SIGNING_CERT_ENDPOINT = "/api/v2/signingCert" 
    43TRUST_BUNDLE_ENDPOINT = "/api/v2/trustBundle" 
    44 
    45 
    46class ExpiredCertificate(Exception): 
    47    """An error raised when the Certificate is expired.""" 
    48 
    49 
    50@dataclass(frozen=True) 
    51class FulcioCertificateSigningResponse: 
    52    """Certificate response""" 
    53 
    54    cert: Certificate 
    55    chain: list[Certificate] 
    56 
    57 
    58@dataclass(frozen=True) 
    59class FulcioTrustBundleResponse: 
    60    """Trust bundle response, containing a list of certificate chains""" 
    61 
    62    trust_bundle: list[list[Certificate]] 
    63 
    64 
    65class FulcioClientError(Exception): 
    66    """ 
    67    Raised on any error in the Fulcio client. 
    68    """ 
    69 
    70    pass 
    71 
    72 
    73class _Endpoint(ABC): 
    74    def __init__(self, url: str, session: requests.Session) -> None: 
    75        self.url = url 
    76        self.session = session 
    77 
    78 
    79def _serialize_cert_request(req: CertificateSigningRequest) -> str: 
    80    data = { 
    81        "certificateSigningRequest": B64Str( 
    82            base64.b64encode(req.public_bytes(serialization.Encoding.PEM)).decode() 
    83        ) 
    84    } 
    85    return json.dumps(data) 
    86 
    87 
    88class FulcioSigningCert(_Endpoint): 
    89    """ 
    90    Fulcio REST API signing certificate functionality. 
    91    """ 
    92 
    93    def post( 
    94        self, req: CertificateSigningRequest, identity: IdentityToken 
    95    ) -> FulcioCertificateSigningResponse: 
    96        """ 
    97        Get the signing certificate, using an X.509 Certificate 
    98        Signing Request. 
    99        """ 
    100        headers = { 
    101            "Authorization": f"Bearer {identity}", 
    102            "Content-Type": "application/json", 
    103            "Accept": "application/pem-certificate-chain", 
    104        } 
    105        resp: requests.Response = self.session.post( 
    106            url=self.url, data=_serialize_cert_request(req), headers=headers 
    107        ) 
    108        try: 
    109            resp.raise_for_status() 
    110        except requests.HTTPError as http_error: 
    111            # See if we can optionally add a message 
    112            if http_error.response: 
    113                text = json.loads(http_error.response.text) 
    114                if "message" in http_error.response.text: 
    115                    raise FulcioClientError(text["message"]) from http_error 
    116            raise FulcioClientError from http_error 
    117 
    118        try: 
    119            certificates = resp.json()["signedCertificateEmbeddedSct"]["chain"][ 
    120                "certificates" 
    121            ] 
    122        except KeyError: 
    123            raise FulcioClientError("Fulcio response missing certificate chain") 
    124 
    125        # Cryptography doesn't have chain verification/building built in 
    126        # https://github.com/pyca/cryptography/issues/2381 
    127        if len(certificates) < 2: 
    128            raise FulcioClientError( 
    129                f"Certificate chain is too short: {len(certificates)} < 2" 
    130            ) 
    131        cert = load_pem_x509_certificate(certificates[0].encode()) 
    132        chain = [load_pem_x509_certificate(c.encode()) for c in certificates[1:]] 
    133 
    134        return FulcioCertificateSigningResponse(cert, chain) 
    135 
    136 
    137class FulcioTrustBundle(_Endpoint): 
    138    """ 
    139    Fulcio REST API trust bundle functionality. 
    140    """ 
    141 
    142    def get(self) -> FulcioTrustBundleResponse: 
    143        """Get the certificate chains from Fulcio""" 
    144        resp: requests.Response = self.session.get(self.url) 
    145        try: 
    146            resp.raise_for_status() 
    147        except requests.HTTPError as http_error: 
    148            raise FulcioClientError from http_error 
    149 
    150        trust_bundle_json = resp.json() 
    151        chains: list[list[Certificate]] = [] 
    152        for certificate_chain in trust_bundle_json["chains"]: 
    153            chain: list[Certificate] = [] 
    154            for certificate in certificate_chain["certificates"]: 
    155                cert: Certificate = load_pem_x509_certificate(certificate.encode()) 
    156                chain.append(cert) 
    157            chains.append(chain) 
    158        return FulcioTrustBundleResponse(chains) 
    159 
    160 
    161class FulcioClient: 
    162    """The internal Fulcio client""" 
    163 
    164    def __init__(self, url: str) -> None: 
    165        """Initialize the client""" 
    166        _logger.debug(f"Fulcio client using URL: {url}") 
    167        self.url = url 
    168        self.session = requests.Session() 
    169        self.session.headers.update( 
    170            { 
    171                "User-Agent": USER_AGENT, 
    172            } 
    173        ) 
    174 
    175    def __del__(self) -> None: 
    176        """ 
    177        Destroys the underlying network session. 
    178        """ 
    179        self.session.close() 
    180 
    181    @property 
    182    def signing_cert(self) -> FulcioSigningCert: 
    183        """ 
    184        Returns a model capable of interacting with Fulcio's signing certificate endpoints. 
    185        """ 
    186        return FulcioSigningCert( 
    187            urljoin(self.url, SIGNING_CERT_ENDPOINT), session=self.session 
    188        ) 
    189 
    190    @property 
    191    def trust_bundle(self) -> FulcioTrustBundle: 
    192        """ 
    193        Returns a model capable of interacting with Fulcio's trust bundle endpoints. 
    194        """ 
    195        return FulcioTrustBundle( 
    196            urljoin(self.url, TRUST_BUNDLE_ENDPOINT), session=self.session 
    197        )