import json import requests with open('config.json') as json_file: conf = json.load(json_file) GENERAL_FAILURE = 160 UNAUTHORIZED = 161 BAD_GROUP_NAME = 162 TEST_OK = 0 IP_ADDRESS = conf.get('PARAMS', {}).get('ConnectorHost') PORT = conf.get('PARAMS', {}).get('ConnectorPort') USER = conf.get('PARAMS', {}).get('ConnectorUsername') PASSWORD = conf.get('PARAMS', {}).get('ConnectorPassword') API_TOKEN = conf.get('PARAMS', {}).get('ConnectorApiKey') TEST_MODE = conf.get('TestMode') OPERATION_COMMENT = conf.get('PARAMS', {}).get('EventDestination') BLOCKED_IP_ADDRESS = conf.get('PARAMS', {}).get('EventDestination') OPERATION_NAME = "FortiEDR_" + conf.get('PARAMS', {}).get('EventDestination') POLICY_GROUP_NAME = 'FortiEDR-Blocked-address' # Disable requests' warnings for insecure connections from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) class FortiGate: def __init__(self, ipaddr, username, password, api_token, timeout=10, vdom="root", port="443"): self.ipaddr = ipaddr self.username = username self.password = password self.port = port self.urlbase = "https://{ipaddr}:{port}/".format(ipaddr=self.ipaddr, port=self.port) self.timeout = timeout self.vdom = vdom self.api_token = api_token self.use_token = True if API_TOKEN else False self.params = {'vdom': self.vdom} if self.use_token: self.headers = {'Authorization': 'Bearer ' + self.api_token} self.params['access_token'] = self.api_token # Login / Logout Handlers def login(self): """ Log in to FortiGate with info provided in during class instantiation :return: Open Session """ session = requests.session() url = self.urlbase + 'logincheck' # Login session.post(url, data='username={username}&secretkey={password}'.format(username=self.username, password=self.password), verify=False, timeout=self.timeout) # Get CSRF token from cookies, add to headers for cookie in session.cookies: if cookie.name == 'ccsrftoken': csrftoken = cookie.value[1:-1] # strip quotes session.headers.update({'X-CSRFTOKEN': csrftoken}) # Check whether login was successful login_check = session.get(self.urlbase + "api/v2/cmdb/system/vdom") login_check.raise_for_status() return session def logout(self, session): """ Log out of device :param session: Session created by login method :return: None """ url = self.urlbase + 'logout' session.get(url, verify=False, timeout=self.timeout) # print("Session logged out.") # General Logic Methods def does_exist(self, object_url): """ GET URL to assert whether it exists within the firewall :param object_url: Object to locate :return: Bool - True if exists, False if not """ if self.use_token: request = requests.get(object_url, headers=self.headers, verify=False, timeout=self.timeout, params=self.params) else: session = self.login() request = session.get(object_url, verify=False, timeout=self.timeout, params=self.params) self.logout(session) if request.status_code == 200: return True else: return False # API Interaction Methods def get(self, url): """ Perform GET operation on provided URL :param url: Target of GET operation :return: Request result if successful (type list), HTTP status code otherwise (type int) """ if self.use_token: request = requests.get(url, headers=self.headers, verify=False, timeout=self.timeout, params=self.params) else: session = self.login() request = session.get(url, verify=False, timeout=self.timeout, params=self.params) self.logout(session) if request.status_code == 200: return request.json()['results'] else: return request.status_code def put(self, url, data): """ Perform PUT operation on provided URL :param url: Target of PUT operation :param data: JSON data. MUST be a correctly formatted string. e.g. "{'key': 'value'}" :return: HTTP status code returned from PUT operation """ if self.use_token: result = requests.put(url, data=data, headers=self.headers, verify=False, timeout=self.timeout, params=self.params).status_code else: session = self.login() result = session.put(url, data=data, verify=False, timeout=self.timeout, params=self.params).status_code self.logout(session) return result def post(self, url, data): """ Perform POST operation on provided URL :param url: Target of POST operation :param data: JSON data. MUST be a correctly formatted string. e.g. "{'key': 'value'}" :return: HTTP status code returned from POST operation """ if self.use_token: headers = self.headers headers['content-type'] = 'application/json' result = requests.post(url, data=data, headers=headers, verify=False, timeout=self.timeout, params=self.params).status_code else: session = self.login() result = session.post(url, data=data, verify=False, timeout=self.timeout, params=self.params).status_code self.logout(session) return result def create_firewall_address(self, address, data): """ Create firewall address record :param address: Address record to be created :param data: JSON Data with which to create the address record :return: HTTP Status Code """ api_url = self.urlbase + "api/v2/cmdb/firewall/address/" # Check whether target object already exists if self.does_exist(api_url + address): return 424 result = self.post(api_url, data) return result # Address Group Methods def get_address_group(self, specific=False): """ Get address group object information from firewall :param specific: If provided, a specific object will be returned. If not, all objects will be returned. :param filters: If provided, the raw filter is appended to the API call. :return: JSON data for all objects in scope of request, nested in a list. """ api_url = self.urlbase + "api/v2/cmdb/firewall/addrgrp/" if specific: api_url += specific results = self.get(api_url) return results def update_address_group(self, group_name, data): """ Update address group with provided data :param group_name: Address group being updated :param data: JSON Data with which to upate the address group :return: HTTP Status Code """ api_url = self.urlbase + "api/v2/cmdb/firewall/addrgrp/" + group_name # Check whether target object already exists if not self.does_exist(api_url): print('Requested address group "{group_name}" does not exist in Firewall config.'.format( group_name=group_name)) return 404 result = self.put(api_url, data) return result BLOCK_POLICY_GROUP_NAME = POLICY_GROUP_NAME # "enSilo Blocked IPs" def script_test(fortigate): """ Monitor the system's status, mainly for testing mode """ try: grp = fortigate.get_address_group(BLOCK_POLICY_GROUP_NAME) if isinstance(grp, int): if grp == 401: print("Unauthorized") return UNAUTHORIZED else: print(grp) print("Bad group name") return BAD_GROUP_NAME else: print("test OK") return TEST_OK except Exception as e: if isinstance(e, requests.HTTPError) and e.response.status_code == 401: print("Unauthorized") return UNAUTHORIZED else: print("Bad address") return GENERAL_FAILURE def block_ip(fortigate, ip, name, comment): BLOCKED_ADDRESS_TEMPLATE = { "comment": comment, "color": 6, "interface": "", "allow-routing": "disable", "type": "iprange", "q_origin_key": name, "visibility": "disable", "end-ip": ip, "start-ip": ip, "name": name } res = fortigate.create_firewall_address(name, json.dumps(BLOCKED_ADDRESS_TEMPLATE)) if res != 200: return res # add the address to the group. grp = fortigate.get_address_group(BLOCK_POLICY_GROUP_NAME) if isinstance(grp, int): print("Failed to get address group {}".format(BLOCK_POLICY_GROUP_NAME)) return grp grp = grp[0] members = grp.get('member', []) if len(members) == 1 and members[0]['name'] == 'none': members = [] members.append({'name': name, 'q_origin_key': name}) res = fortigate.update_address_group(BLOCK_POLICY_GROUP_NAME, json.dumps({"member": members})) return res if __name__ == "__main__": # Setup FortiGate. fortigate = FortiGate(ipaddr=IP_ADDRESS, username=USER, password=PASSWORD, api_token=API_TOKEN, port=PORT) if TEST_MODE: test_res = script_test(fortigate) exit(test_res) else: status_code = block_ip(fortigate, BLOCKED_IP_ADDRESS, OPERATION_NAME, OPERATION_COMMENT) if status_code == 424: print("Address exists in the FW already") elif status_code != 200: exit(GENERAL_FAILURE) else: print("{} IP is blocked.".format(BLOCKED_IP_ADDRESS))