1""" 
    2This module defines an OpenAPIOperation class, a Connexion operation specific for OpenAPI 3 specs. 
    3""" 
    4 
    5import logging 
    6import typing as t 
    7from http import HTTPStatus 
    8 
    9from connexion.datastructures import MediaTypeDict, NoContent 
    10from connexion.operations.abstract import AbstractOperation 
    11from connexion.uri_parsing import OpenAPIURIParser 
    12from connexion.utils import build_example_from_schema, deep_get 
    13 
    14logger = logging.getLogger("connexion.operations.openapi3") 
    15 
    16 
    17class OpenAPIOperation(AbstractOperation): 
    18 
    19    """ 
    20    A single API operation on a path. 
    21    """ 
    22 
    23    def __init__( 
    24        self, 
    25        method, 
    26        path, 
    27        operation, 
    28        resolver, 
    29        path_parameters=None, 
    30        app_security=None, 
    31        security_schemes=None, 
    32        components=None, 
    33        randomize_endpoint=None, 
    34        uri_parser_class=None, 
    35    ): 
    36        """ 
    37        This class uses the OperationID identify the module and function that will handle the operation 
    38 
    39        From Swagger Specification: 
    40 
    41        **OperationID** 
    42 
    43        A friendly name for the operation. The id MUST be unique among all operations described in the API. 
    44        Tools and libraries MAY use the operation id to uniquely identify an operation. 
    45 
    46        :param method: HTTP method 
    47        :type method: str 
    48        :param path: 
    49        :type path: str 
    50        :param operation: swagger operation object 
    51        :type operation: dict 
    52        :param resolver: Callable that maps operationID to a function 
    53        :param path_parameters: Parameters defined in the path level 
    54        :type path_parameters: list 
    55        :param app_security: list of security rules the application uses by default 
    56        :type app_security: list 
    57        :param security_schemes: `Security Definitions Object 
    58            <https://github.com/swagger-api/swagger-spec/blob/master/versions/2.0.md#security-definitions-object>`_ 
    59        :type security_schemes: dict 
    60        :param components: `Components Object 
    61            <https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#componentsObject>`_ 
    62        :type components: dict 
    63        :param randomize_endpoint: number of random characters to append to operation name 
    64        :type randomize_endpoint: integer 
    65        :param uri_parser_class: class to use for uri parsing 
    66        :type uri_parser_class: AbstractURIParser 
    67        """ 
    68        self.components = components or {} 
    69 
    70        uri_parser_class = uri_parser_class or OpenAPIURIParser 
    71 
    72        self._router_controller = operation.get("x-openapi-router-controller") 
    73 
    74        super().__init__( 
    75            method=method, 
    76            path=path, 
    77            operation=operation, 
    78            resolver=resolver, 
    79            app_security=app_security, 
    80            security_schemes=security_schemes, 
    81            randomize_endpoint=randomize_endpoint, 
    82            uri_parser_class=uri_parser_class, 
    83        ) 
    84 
    85        self._parameters = operation.get("parameters", []) 
    86        if path_parameters: 
    87            self._parameters += path_parameters 
    88 
    89        self._responses = operation.get("responses", {}) 
    90 
    91        # TODO figure out how to support multiple mimetypes 
    92        # NOTE we currently just combine all of the possible mimetypes, 
    93        #      but we need to refactor to support mimetypes by response code 
    94        response_content_types = [] 
    95        for _, defn in self._responses.items(): 
    96            response_content_types += defn.get("content", {}).keys() 
    97        self._produces = response_content_types 
    98        self._consumes = None 
    99 
    100        logger.debug("consumes: %s" % self.consumes) 
    101        logger.debug("produces: %s" % self.produces) 
    102 
    103    @classmethod 
    104    def from_spec(cls, spec, *args, path, method, resolver, **kwargs): 
    105        return cls( 
    106            method, 
    107            path, 
    108            spec.get_operation(path, method), 
    109            resolver=resolver, 
    110            path_parameters=spec.get_path_params(path), 
    111            app_security=spec.security, 
    112            security_schemes=spec.security_schemes, 
    113            components=spec.components, 
    114            *args, 
    115            **kwargs, 
    116        ) 
    117 
    118    @property 
    119    def request_body(self): 
    120        return self._operation.get("requestBody", {}) 
    121 
    122    @property 
    123    def parameters(self): 
    124        return self._parameters 
    125 
    126    @property 
    127    def consumes(self): 
    128        if self._consumes is None: 
    129            request_content = self.request_body.get("content", {}) 
    130            self._consumes = list(request_content.keys()) 
    131        return self._consumes 
    132 
    133    @property 
    134    def produces(self): 
    135        return self._produces 
    136 
    137    def with_definitions(self, schema: dict): 
    138        if self.components: 
    139            schema.setdefault("schema", {}) 
    140            schema["schema"]["components"] = self.components 
    141        return schema 
    142 
    143    def response_schema(self, status_code=None, content_type=None): 
    144        response_definition = self.response_definition(status_code, content_type) 
    145        content_definition = response_definition.get("content", response_definition) 
    146        content_definition = content_definition.get(content_type, content_definition) 
    147        if "schema" in content_definition: 
    148            return self.with_definitions(content_definition).get("schema", {}) 
    149        return {} 
    150 
    151    def example_response(self, status_code=None, content_type=None): 
    152        """ 
    153        Returns example response from spec 
    154        """ 
    155        # simply use the first/lowest status code, this is probably 200 or 201 
    156        status_code = status_code or sorted(self._responses.keys())[0] 
    157 
    158        content_type = content_type or self.get_mimetype() 
    159        examples_path = [str(status_code), "content", content_type, "examples"] 
    160        example_path = [str(status_code), "content", content_type, "example"] 
    161        schema_example_path = [ 
    162            str(status_code), 
    163            "content", 
    164            content_type, 
    165            "schema", 
    166            "example", 
    167        ] 
    168        schema_path = [str(status_code), "content", content_type, "schema"] 
    169 
    170        try: 
    171            status_code = int(status_code) 
    172        except ValueError: 
    173            status_code = 200 
    174 
    175        if status_code == HTTPStatus.NO_CONTENT: 
    176            return NoContent, status_code 
    177 
    178        try: 
    179            # TODO also use example header? 
    180            return ( 
    181                list(deep_get(self._responses, examples_path).values())[0]["value"], 
    182                status_code, 
    183            ) 
    184        except (KeyError, IndexError): 
    185            pass 
    186        try: 
    187            return (deep_get(self._responses, example_path), status_code) 
    188        except KeyError: 
    189            pass 
    190        try: 
    191            return (deep_get(self._responses, schema_example_path), status_code) 
    192        except KeyError: 
    193            pass 
    194 
    195        try: 
    196            schema = deep_get(self._responses, schema_path) 
    197        except KeyError: 
    198            return ("No example response or response schema defined.", status_code) 
    199 
    200        return (build_example_from_schema(schema), status_code) 
    201 
    202    def get_path_parameter_types(self): 
    203        types = {} 
    204        path_parameters = (p for p in self.parameters if p["in"] == "path") 
    205        for path_defn in path_parameters: 
    206            path_schema = path_defn["schema"] 
    207            if ( 
    208                path_schema.get("type") == "string" 
    209                and path_schema.get("format") == "path" 
    210            ): 
    211                # path is special case for type 'string' 
    212                path_type = "path" 
    213            else: 
    214                path_type = path_schema.get("type") 
    215            types[path_defn["name"]] = path_type 
    216        return types 
    217 
    218    def body_name(self, _content_type: str) -> str: 
    219        return self.request_body.get("x-body-name", "body") 
    220 
    221    def body_schema(self, content_type: t.Optional[str] = None) -> dict: 
    222        """ 
    223        The body schema definition for this operation. 
    224        """ 
    225        return self.body_definition(content_type).get("schema", {}) 
    226 
    227    def body_definition(self, content_type: t.Optional[str] = None) -> dict: 
    228        """ 
    229        The body complete definition for this operation. 
    230 
    231        **There can be one "body" parameter at most.** 
    232        """ 
    233        if self.request_body: 
    234            if content_type is None: 
    235                # TODO: make content type required 
    236                content_type = self.consumes[0] 
    237            if len(self.consumes) > 1: 
    238                logger.warning( 
    239                    "this operation accepts multiple content types, using %s", 
    240                    content_type, 
    241                ) 
    242            content_type_dict = MediaTypeDict(self.request_body.get("content", {})) 
    243            res = content_type_dict.get(content_type, {}) 
    244            return self.with_definitions(res) 
    245        return {}