| 1 | + | # Copyright (c) Microsoft Corporation. |
| 2 | + | # Licensed under the MIT License. |
| 3 | + | |
| 4 | + | import requests |
| 5 | + | import collections |
| 6 | + | import sys |
| 7 | + | from retry import retry |
| 8 | + | |
| 9 | + | |
| 10 | + | class NvdApiError(Exception): |
| 11 | + | """Exception raised for errors in the API request """ |
| 12 | + | def __init__(self, params, msg): |
| 13 | + | self.message = f"Received an error during the API process with the following params: {params}. The API message: {msg}" |
| 14 | + | super().__init__(self.message) |
| 15 | + | |
| 16 | + | class hashabledict(dict): |
| 17 | + | def __key(self): |
| 18 | + | summ = [] |
| 19 | + | for k in sorted(self): |
| 20 | + | v = self[k] |
| 21 | + | if type(v) == dict: |
| 22 | + | summ.append((k, hashabledict(v))) |
| 23 | + | elif type(v) == list: |
| 24 | + | summ.append((k, tuple(sorted(str(v))))) |
| 25 | + | else: |
| 26 | + | summ.append((k, v)) |
| 27 | + | return tuple(summ) |
| 28 | + | |
| 29 | + | def __hash__(self): |
| 30 | + | return hash(self.__key()) |
| 31 | + | |
| 32 | + | def __eq__(self, other): |
| 33 | + | return self.__key() == other.__key() |
| 34 | + | |
| 35 | + | |
| 36 | + | class CVEsInterface(): |
| 37 | + | def __init__(self): |
| 38 | + | self._ver_cves = collections.defaultdict(list) |
| 39 | + | |
| 40 | + | @retry((NvdApiError, Exception), tries=3, delay=2) |
| 41 | + | def _web_api_query(self, url, params=None): |
| 42 | + | response = requests.get(f"{url}", params=params, timeout=10) |
| 43 | + | if response.status_code == 200: |
| 44 | + | response = response.json() |
| 45 | + | return response |
| 46 | + | else: |
| 47 | + | msg = "" |
| 48 | + | if "message" in response.json().keys(): |
| 49 | + | msg = response.json()["message"] |
| 50 | + | raise NvdApiError(params, msg) |
| 51 | + | |
| 52 | + | def nist_api(self, vendor, product): |
| 53 | + | resultsPerPage = 500 |
| 54 | + | |
| 55 | + | totalResults = self.get_cves(product, vendor, resultsPerPage, 0) |
| 56 | + | |
| 57 | + | for cur_index in range(resultsPerPage, totalResults, resultsPerPage): |
| 58 | + | self.get_cves(product, vendor, resultsPerPage, cur_index) |
| 59 | + | |
| 60 | + | return self._ver_cves |
| 61 | + | |
| 62 | + | def get_cves(self, product, vendor, resultsPerPage, cur_index): |
| 63 | + | total_results = 0 |
| 64 | + | response = self._web_api_query("https://services.nvd.nist.gov/rest/json/cves/1.0?", |
| 65 | + | params={"keyword": product, "resultsPerPage": resultsPerPage, |
| 66 | + | "startIndex": cur_index}) |
| 67 | + | if response: |
| 68 | + | self._convert_to_ranges(response["result"]["CVE_Items"], vendor, product) |
| 69 | + | total_results = response["totalResults"] |
| 70 | + | return total_results |
| 71 | + | |
| 72 | + | def _convert_to_ranges(self, all_cves_data, vendor, product): |
| 73 | + | for cve_data in all_cves_data: |
| 74 | + | cve = cve_data["cve"]['CVE_data_meta']['ID'] |
| 75 | + | |
| 76 | + | if cve in self._ver_cves.keys(): |
| 77 | + | continue |
| 78 | + | |
| 79 | + | if 'configurations' not in cve_data: |
| 80 | + | print (f'ERROR: No configurations {cve}', file = sys.stderr) |
| 81 | + | else: |
| 82 | + | if 'nodes' not in cve_data['configurations']: |
| 83 | + | print (f'ERROR: No nodes {cve}', file = sys.stderr) |
| 84 | + | else: |
| 85 | + | versions = [] |
| 86 | + | for node in cve_data['configurations']['nodes']: |
| 87 | + | if node['operator'] != 'OR': |
| 88 | + | print(f'DEBUG: No handling for OR operator in node, the following CVE needs to be implemented: {cve}', file=sys.stderr) |
| 89 | + | else: |
| 90 | + | for cpe_match in node['cpe_match']: |
| 91 | + | cpe_res = hashabledict() |
| 92 | + | if 'cpe23Uri' in cpe_match: |
| 93 | + | if not f'{vendor}:{product}' in cpe_match['cpe23Uri']: |
| 94 | + | continue |
| 95 | + | |
| 96 | + | if 'versionStartIncluding' in cpe_match: |
| 97 | + | cpe_res['start_including'] = cpe_match['versionStartIncluding'] |
| 98 | + | if 'versionEndIncluding' in cpe_match: |
| 99 | + | cpe_res['end_including'] = cpe_match['versionEndIncluding'] |
| 100 | + | if 'versionStartExcluding' in cpe_match: |
| 101 | + | cpe_res['start_excluding'] = cpe_match['versionStartExcluding'] |
| 102 | + | if 'versionEndExcluding' in cpe_match: |
| 103 | + | cpe_res['end_excluding'] = cpe_match['versionEndExcluding'] |
| 104 | + | |
| 105 | + | if 'cpe23Uri' in cpe_match: |
| 106 | + | exact_ver = cpe_match['cpe23Uri'].partition(f'cpe:2.3:o:{vendor}:{product}:')[2].partition(':')[0] |
| 107 | + | if exact_ver not in (['*', '']): |
| 108 | + | cpe_res['exact'] = exact_ver |
| 109 | + | |
| 110 | + | if cpe_res: |
| 111 | + | versions.append(cpe_res) |
| 112 | + | if versions: |
| 113 | + | self._ver_cves[cve] = list(set(versions)) |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |