1# Copyright 2022 The Sigstore Authors 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#      http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15""" 
    16Utilities to deal with sources of signed time. 
    17""" 
    18 
    19import enum 
    20from dataclasses import dataclass 
    21from datetime import datetime 
    22 
    23import requests 
    24from rfc3161_client import ( 
    25    TimestampRequestBuilder, 
    26    TimeStampResponse, 
    27    decode_timestamp_response, 
    28) 
    29from rfc3161_client.base import HashAlgorithm 
    30 
    31from sigstore._internal import USER_AGENT 
    32 
    33CLIENT_TIMEOUT: int = 5 
    34 
    35 
    36class TimestampSource(enum.Enum): 
    37    """Represents the source of a timestamp.""" 
    38 
    39    TIMESTAMP_AUTHORITY = enum.auto() 
    40    TRANSPARENCY_SERVICE = enum.auto() 
    41 
    42 
    43@dataclass 
    44class TimestampVerificationResult: 
    45    """Represents a timestamp used by the Verifier. 
    46 
    47    A Timestamp either comes from a Timestamping Service (RFC3161) or the Transparency 
    48    Service. 
    49    """ 
    50 
    51    source: TimestampSource 
    52    time: datetime 
    53 
    54 
    55class TimestampError(Exception): 
    56    """ 
    57    A generic error in the TimestampAuthority client. 
    58    """ 
    59 
    60    pass 
    61 
    62 
    63class TimestampAuthorityClient: 
    64    """Internal client to deal with a Timestamp Authority""" 
    65 
    66    def __init__(self, url: str) -> None: 
    67        """ 
    68        Create a new `TimestampAuthorityClient` from the given URL. 
    69        """ 
    70        self.url = url 
    71 
    72    def request_timestamp(self, signature: bytes) -> TimeStampResponse: 
    73        """ 
    74        Timestamp the signature using the configured Timestamp Authority. 
    75 
    76        This method generates a RFC3161 Timestamp Request and sends it to a TSA. 
    77        The received response is parsed but *not* cryptographically verified. 
    78 
    79        Raises a TimestampError on failure. 
    80        """ 
    81        # Build the timestamp request 
    82        try: 
    83            timestamp_request = ( 
    84                TimestampRequestBuilder() 
    85                .hash_algorithm(HashAlgorithm.SHA256) 
    86                .data(signature) 
    87                .nonce(nonce=True) 
    88                .build() 
    89            ) 
    90        except ValueError as error: 
    91            msg = f"invalid request: {error}" 
    92            raise TimestampError(msg) 
    93 
    94        # Use single use session to avoid potential Session thread safety issues 
    95        session = requests.Session() 
    96        session.headers.update( 
    97            { 
    98                "Content-Type": "application/timestamp-query", 
    99                "User-Agent": USER_AGENT, 
    100            } 
    101        ) 
    102 
    103        # Send it to the TSA for signing 
    104        try: 
    105            response = session.post( 
    106                self.url, 
    107                data=timestamp_request.as_bytes(), 
    108                timeout=CLIENT_TIMEOUT, 
    109            ) 
    110            response.raise_for_status() 
    111        except requests.RequestException as error: 
    112            msg = f"error while sending the request to the TSA: {error}" 
    113            raise TimestampError(msg) 
    114 
    115        # Check that we can parse the response but do not *verify* it 
    116        try: 
    117            timestamp_response = decode_timestamp_response(response.content) 
    118        except ValueError as e: 
    119            msg = f"invalid response: {e}" 
    120            raise TimestampError(msg) 
    121 
    122        return timestamp_response