From e303b6f131f05f4ca700edd6d4241c0d660d0b86 Mon Sep 17 00:00:00 2001 From: J08nY Date: Mon, 4 Nov 2024 21:45:51 +0100 Subject: [PATCH] Improve scheme parsing. Only match if category matches. Disregard unwanted warnings. Add progress bars everywhere. --- src/sec_certs/dataset/cc_scheme.py | 6 +- src/sec_certs/model/cc_matching.py | 23 +++ src/sec_certs/sample/cc_scheme.py | 227 ++++++++++++++++++----------- 3 files changed, 166 insertions(+), 90 deletions(-) diff --git a/src/sec_certs/dataset/cc_scheme.py b/src/sec_certs/dataset/cc_scheme.py index e099f9de..19f1a3de 100644 --- a/src/sec_certs/dataset/cc_scheme.py +++ b/src/sec_certs/dataset/cc_scheme.py @@ -49,13 +49,15 @@ def from_dict(cls, dct: Mapping) -> CCSchemeDataset: return cls(dct["schemes"]) @classmethod - def from_web(cls, only_schemes: set[str] | None = None) -> CCSchemeDataset: + def from_web( + cls, only_schemes: set[str] | None = None, enhanced: bool | None = None, artifacts: bool | None = None + ) -> CCSchemeDataset: schemes = {} for scheme, sources in CCScheme.methods.items(): if only_schemes is not None and scheme not in only_schemes: continue try: - schemes[scheme] = CCScheme.from_web(scheme, sources.keys()) + schemes[scheme] = CCScheme.from_web(scheme, sources.keys(), enhanced=enhanced, artifacts=artifacts) except Exception as e: logger.warning(f"Could not download CC scheme: {scheme} due to error {e}.") return cls(schemes) diff --git a/src/sec_certs/model/cc_matching.py b/src/sec_certs/model/cc_matching.py index 21d94b74..9930fb66 100644 --- a/src/sec_certs/model/cc_matching.py +++ b/src/sec_certs/model/cc_matching.py @@ -10,6 +10,24 @@ from sec_certs.sample.cc_certificate_id import CertificateId from sec_certs.utils.strings import fully_sanitize_string +CATEGORIES = { + "ICs, Smart Cards and Smart Card-Related Devices and Systems", + "Other Devices and Systems", + "Network and Network-Related Devices and Systems", + "Multi-Function Devices", + "Boundary Protection Devices and Systems", + "Data Protection", + "Operating Systems", + "Products for Digital Signatures", + "Access Control Devices and Systems", + "Mobility", + "Databases", + "Trusted Computing", + "Detection Devices and Systems", + "Key Management Systems", + "Biometric Systems and Devices", +} + class CCSchemeMatcher(AbstractMatcher[CCCertificate]): """ @@ -46,6 +64,8 @@ def _prepare(self): if vendor_name := self._get_from_entry("vendor", "developer", "manufacturer", "supplier"): self._vendor = fully_sanitize_string(vendor_name) + self._category = self._get_from_entry("category") + self._report_hash = self._get_from_entry("report_hash") self._target_hash = self._get_from_entry("target_hash") @@ -69,6 +89,9 @@ def match(self, cert: CCCertificate) -> float: # We need to have something to match to. if self._product is None or self._vendor is None or cert.name is None or cert.manufacturer is None: return 0 + # It is a correctly parsed category but the wrong one. + if self._category in CATEGORIES and self._category != cert.category: + return 0 cert_name = fully_sanitize_string(cert.name) cert_manufacturer = fully_sanitize_string(cert.manufacturer) # If we match exactly, return early. diff --git a/src/sec_certs/sample/cc_scheme.py b/src/sec_certs/sample/cc_scheme.py index 080e3f52..4c46d67e 100644 --- a/src/sec_certs/sample/cc_scheme.py +++ b/src/sec_certs/sample/cc_scheme.py @@ -11,6 +11,7 @@ from dataclasses import dataclass from datetime import datetime from enum import Enum +from inspect import signature from pathlib import Path from typing import Any, ClassVar from urllib.parse import urljoin @@ -19,7 +20,7 @@ import tabula from bs4 import BeautifulSoup, NavigableString, Tag from dateutil.parser import isoparse -from requests import Response +from requests import ConnectionError, HTTPError, Response from urllib3.connectionpool import InsecureRequestWarning from sec_certs import constants @@ -50,6 +51,8 @@ "get_korea_certified", "get_korea_suspended", "get_korea_archived", + "get_poland_certified", + "get_poland_ineval", "get_singapore_certified", "get_singapore_in_evaluation", "get_singapore_archived", @@ -75,7 +78,21 @@ "Cache-Control": "no-cache", "Dnt": "1", "Pragma": "no-cache", - "Priority": "u=0, i", + "Priority": "u=0, i", # 'ICs, Smart Cards and Smart Card-Related Devices and Systems': 1978, + # 'Other Devices and Systems': 1043, + # 'Network and Network-Related Devices and Systems': 835, + # 'Multi-Function Devices': 671, + # 'Boundary Protection Devices and Systems': 253, + # 'Data Protection': 240, + # 'Operating Systems': 237, + # 'Products for Digital Signatures': 179, + # 'Access Control Devices and Systems': 155, + # 'Mobility': 115, + # 'Databases': 103, + # 'Trusted Computing': 78, + # 'Detection Devices and Systems': 77, + # 'Key Management Systems': 59, + # 'Biometric Systems and Devices' "Sec-Ch-Ua": 'Not?A_Brand";v="99", "Chromium";v="130', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"Linux"', @@ -109,12 +126,15 @@ def _get_page(url: str, session=None, **kwargs) -> BeautifulSoup: return BeautifulSoup(_get(url, session, **kwargs).content, "html5lib") -def _get_hash(url: str, session=None, **kwargs) -> bytes: - resp = _get(url, session, **kwargs) +def _get_hash(url: str, session=None, **kwargs) -> str | None: + try: + resp = _get(url, session, **kwargs) + except (HTTPError, ConnectionError): + return None h = hashlib.sha256() for chunk in resp.iter_content(): h.update(chunk) - return h.digest() + return h.digest().hex() def get_australia_in_evaluation( # noqa: C901 @@ -240,7 +260,7 @@ def get_canada_in_evaluation() -> list[dict[str, Any]]: return results -def _get_france(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901 +def _get_france(url, enhanced, artifacts, name) -> list[dict[str, Any]]: # noqa: C901 session = requests.session() challenge_soup = _get_page(constants.CC_ANSSI_BASE_URL, session=session) bln_script = challenge_soup.find("head").find_all("script")[1] @@ -256,6 +276,7 @@ def _get_france(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901 raise ValueError pages = int(last_page_a.group()) results = [] + pbar = tqdm(desc=f"Get FR scheme {name}.") for page in range(pages + 1): soup = _get_page(url + f"?page={page}", session=session) for row in soup.find_all("article", class_="node--type-produit-certifie-cc"): @@ -317,17 +338,19 @@ def _get_france(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901 if "Rapport de certification" in a.text: e["report_link"] = urljoin(constants.CC_ANSSI_BASE_URL, a["href"]) if artifacts: - e["report_hash"] = _get_hash(e["report_link"], session=session).hex() + e["report_hash"] = _get_hash(e["report_link"], session=session) elif "Cible de sécurité" in a.text: e["target_link"] = urljoin(constants.CC_ANSSI_BASE_URL, a["href"]) if artifacts: - e["target_hash"] = _get_hash(e["target_link"], session=session).hex() + e["target_hash"] = _get_hash(e["target_link"], session=session) elif "Certificat" in a.text: e["cert_link"] = urljoin(constants.CC_ANSSI_BASE_URL, a["href"]) if artifacts: - e["cert_hash"] = _get_hash(e["cert_link"], session=session).hex() + e["cert_hash"] = _get_hash(e["cert_link"], session=session) cert["enhanced"] = e + pbar.update() results.append(cert) + pbar.close() return results @@ -339,7 +362,7 @@ def get_france_certified(enhanced: bool = True, artifacts: bool = False) -> list :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_france(constants.CC_ANSSI_CERTIFIED_URL, enhanced, artifacts) + return _get_france(constants.CC_ANSSI_CERTIFIED_URL, enhanced, artifacts, "certified") def get_france_archived(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]: # noqa: C901 @@ -350,7 +373,7 @@ def get_france_archived(enhanced: bool = True, artifacts: bool = False) -> list[ :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_france(constants.CC_ANSSI_ARCHIVED_URL, enhanced, artifacts) + return _get_france(constants.CC_ANSSI_ARCHIVED_URL, enhanced, artifacts, "archived") def get_germany_certified( # noqa: C901 @@ -439,15 +462,15 @@ def get_germany_certified( # noqa: C901 if "Certification Report" in title: e["report_link"] = href if artifacts: - e["report_hash"] = _get_hash(href).hex() + e["report_hash"] = _get_hash(href) elif "Security Target" in title: e["target_link"] = href if artifacts: - e["target_hash"] = _get_hash(href).hex() + e["target_hash"] = _get_hash(href) elif "Certificate" in title: e["cert_link"] = href if artifacts: - e["cert_hash"] = _get_hash(href).hex() + e["cert_hash"] = _get_hash(href) description = content.find("div", attrs={"lang": "en"}) if description: e["description"] = sns(description.text) @@ -471,6 +494,7 @@ def get_india_certified() -> list[dict[str, Any]]: pages = {0} seen_pages = set() results = [] + pbar = tqdm(desc="Get IN scheme certified.") while pages: page = pages.pop() seen_pages.add(page) @@ -510,7 +534,9 @@ def get_india_certified() -> list[dict[str, Any]]: "cert_link": urljoin(constants.CC_INDIA_BASE_URL, _fix_india_link(cert_a["href"])), "cert_name": sns(cert_a.text), } + pbar.update() results.append(cert) + pbar.close() return results @@ -523,6 +549,7 @@ def get_india_archived() -> list[dict[str, Any]]: pages = {0} seen_pages = set() results = [] + pbar = tqdm(desc="Get IN scheme archived.") while pages: page = pages.pop() seen_pages.add(page) @@ -564,7 +591,9 @@ def get_india_archived() -> list[dict[str, Any]]: if report_a: cert["report_link"] = urljoin(constants.CC_INDIA_BASE_URL, _fix_india_link(report_a["href"])) cert["report_name"] = sns(report_a.text) + pbar.update() results.append(cert) + pbar.close() return results @@ -577,7 +606,7 @@ def get_italy_certified() -> list[dict[str, Any]]: # noqa: C901 soup = _get_page(constants.CC_ITALY_CERTIFIED_URL) div = soup.find("div", class_="certificati") results = [] - for cert_div in div.find_all("div", recursive=False): + for cert_div in tqdm(div.find_all("div", recursive=False), desc="Get IT scheme certified."): title = cert_div.find("h3").text data_div = cert_div.find("div", class_="collapse") cert = {"title": title} @@ -619,7 +648,7 @@ def get_italy_in_evaluation() -> list[dict[str, Any]]: soup = _get_page(constants.CC_ITALY_INEVAL_URL) div = soup.find("div", class_="valutazioni") results = [] - for cert_div in div.find_all("div", recursive=False): + for cert_div in tqdm(div.find_all("div", recursive=False), desc="Get IT scheme in evaluation."): title = cert_div.find("h3").text data_div = cert_div.find("div", class_="collapse") cert = {"title": title} @@ -639,12 +668,12 @@ def get_italy_in_evaluation() -> list[dict[str, Any]]: return results -def _get_japan(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901 +def _get_japan(url, enhanced, artifacts, name) -> list[dict[str, Any]]: # noqa: C901 soup = _get_page(url) table = soup.find("table", class_="cert-table") results = [] trs = list(table.find_all("tr")) - for tr in trs: + for tr in tqdm(trs, desc=f"Get JP scheme {name}."): tds = tr.find_all("td") if not tds: continue @@ -738,15 +767,15 @@ def _get_japan(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901 if "Report" in name: e["report_link"] = urljoin(constants.CC_JAPAN_BASE_URL, li_a["href"]) if artifacts: - e["report_hash"] = _get_hash(e["report_link"]).hex() + e["report_hash"] = _get_hash(e["report_link"]) elif "Certificate" in name: e["cert_link"] = urljoin(constants.CC_JAPAN_BASE_URL, li_a["href"]) if artifacts: - e["cert_hash"] = _get_hash(e["cert_link"]).hex() + e["cert_hash"] = _get_hash(e["cert_link"]) elif "Target" in name: e["target_link"] = urljoin(constants.CC_JAPAN_BASE_URL, li_a["href"]) if artifacts: - e["target_hash"] = _get_hash(e["target_link"]).hex() + e["target_hash"] = _get_hash(e["target_link"]) e["description"] = sns(main.find("div", id="overviewsbox").text) cert["enhanced"] = e return results @@ -760,8 +789,8 @@ def get_japan_certified(enhanced: bool = True, artifacts: bool = False) -> list[ :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - japan_hw = _get_japan(constants.CC_JAPAN_CERTIFIED_HW_URL, enhanced, artifacts) - japan_sw = _get_japan(constants.CC_JAPAN_CERTIFIED_SW_URL, enhanced, artifacts) + japan_hw = _get_japan(constants.CC_JAPAN_CERTIFIED_HW_URL, enhanced, artifacts, "certified HW") + japan_sw = _get_japan(constants.CC_JAPAN_CERTIFIED_SW_URL, enhanced, artifacts, "certified SW") return japan_sw + japan_hw @@ -773,7 +802,7 @@ def get_japan_archived(enhanced: bool = True, artifacts: bool = False) -> list[d :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_japan(constants.CC_JAPAN_ARCHIVED_SW_URL, enhanced, artifacts) + return _get_japan(constants.CC_JAPAN_ARCHIVED_SW_URL, enhanced, artifacts, "archived SW") def get_japan_in_evaluation() -> list[dict[str, Any]]: @@ -785,7 +814,7 @@ def get_japan_in_evaluation() -> list[dict[str, Any]]: soup = _get_page(constants.CC_JAPAN_INEVAL_URL) table = soup.find("table") results = [] - for tr in table.find_all("tr"): + for tr in tqdm(table.find_all("tr"), desc="Get JP scheme in evaluation."): tds = tr.find_all("td") if not tds: continue @@ -800,13 +829,14 @@ def get_japan_in_evaluation() -> list[dict[str, Any]]: return results -def _get_malaysia(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C901 +def _get_malaysia(url, enhanced, artifacts, name) -> list[dict[str, Any]]: # noqa: C901 soup = _get_page(url) pages_re = re.search("Page [0-9]+ of ([0-9]+)", soup.find("form").text) if not pages_re: raise ValueError total_pages = int(pages_re.group(1)) results = [] + pbar = tqdm(desc=f"Get MY scheme {name}.") for i in range(total_pages): soup = _get_page(url + f"?start={i * 10}") table = soup.find("table", class_="directoryTable") @@ -857,17 +887,19 @@ def _get_malaysia(url, enhanced, artifacts) -> list[dict[str, Any]]: # noqa: C9 if "ST" in a.text: e["target_link"] = urljoin(constants.CC_MALAYSIA_BASE_URL, a["href"]) if artifacts: - e["target_hash"] = _get_hash(e["target_link"]).hex() + e["target_hash"] = _get_hash(e["target_link"]) elif "CR" in a.text: e["report_link"] = urljoin(constants.CC_MALAYSIA_BASE_URL, a["href"]) if artifacts: - e["report_hash"] = _get_hash(e["report_link"]).hex() + e["report_hash"] = _get_hash(e["report_link"]) elif "Maintenance" in title: pass elif "Status" in title: e["status"] = value cert["enhanced"] = e + pbar.update() results.append(cert) + pbar.close() return results @@ -879,7 +911,7 @@ def get_malaysia_certified(enhanced: bool = True, artifacts: bool = False) -> li :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_malaysia(constants.CC_MALAYSIA_CERTIFIED_URL, enhanced, artifacts) + return _get_malaysia(constants.CC_MALAYSIA_CERTIFIED_URL, enhanced, artifacts, "certified") def get_malaysia_archived(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]: @@ -890,7 +922,7 @@ def get_malaysia_archived(enhanced: bool = True, artifacts: bool = False) -> lis :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_malaysia(constants.CC_MALAYSIA_ARCHIVED_URL, enhanced, artifacts) + return _get_malaysia(constants.CC_MALAYSIA_ARCHIVED_URL, enhanced, artifacts, "archived") def get_malaysia_in_evaluation() -> list[dict[str, Any]]: @@ -903,7 +935,7 @@ def get_malaysia_in_evaluation() -> list[dict[str, Any]]: main_div = soup.find("div", attrs={"itemprop": "articleBody"}) table = main_div.find("table") results = [] - for tr in table.find_all("tr")[1:]: + for tr in tqdm(table.find_all("tr")[1:], desc="Get MY scheme in evaluation."): tds = tr.find_all("td") if len(tds) != 5: continue @@ -926,7 +958,7 @@ def _get_netherlands_certified_old( # noqa: C901 rows = main_div.find_all("div", class_="row", recursive=False) modals = main_div.find_all("div", class_="modal", recursive=False) results = [] - for row, modal in zip(rows, modals): + for row, modal in tqdm(zip(rows, modals), desc="Get NL scheme certified (old)."): row_entries = row.find_all("a") modal_trs = modal.find_all("tr") cert: dict[str, Any] = { @@ -945,19 +977,19 @@ def _get_netherlands_certified_old( # noqa: C901 elif "Certificate" in th_text: cert["cert_link"] = urljoin(constants.CC_NETHERLANDS_OLD_BASE_URL, td.find("a")["href"]) if artifacts: - cert["cert_hash"] = _get_hash(cert["cert_link"]).hex() + cert["cert_hash"] = _get_hash(cert["cert_link"]) elif "Certificationreport" in th_text: cert["report_link"] = urljoin(constants.CC_NETHERLANDS_OLD_BASE_URL, td.find("a")["href"]) if artifacts: - cert["report_hash"] = _get_hash(cert["report_link"]).hex() + cert["report_hash"] = _get_hash(cert["report_link"]) elif "Securitytarget" in th_text: cert["target_link"] = urljoin(constants.CC_NETHERLANDS_OLD_BASE_URL, td.find("a")["href"]) if artifacts: - cert["target_hash"] = _get_hash(cert["target_link"]).hex() + cert["target_hash"] = _get_hash(cert["target_link"]) elif "Maintenance report" in th_text: cert["maintenance_link"] = urljoin(constants.CC_NETHERLANDS_OLD_BASE_URL, td.find("a")["href"]) if artifacts: - cert["maintenance_hash"] = _get_hash(cert["maintenance_link"]).hex() + cert["maintenance_hash"] = _get_hash(cert["maintenance_link"]) results.append(cert) return results @@ -968,7 +1000,7 @@ def _get_netherlands_certified_new( # noqa: C901 soup = _get_page(constants.CC_NETHERLANDS_NEW_CERTIFIED_URL) table = soup.find("table", class_="wpDataTable") results = [] - for tr in table.find_all("tr")[1:]: + for tr in tqdm(table.find_all("tr")[1:], desc="Get NL scheme certified (new)."): tds = tr.find_all("td") cert = { "cert_id": sns(tds[0].text).replace("\n", ""), # type: ignore @@ -985,7 +1017,7 @@ def _get_netherlands_certified_new( # noqa: C901 href = urljoin(constants.CC_NETHERLANDS_NEW_BASE_URL, a["href"]) cert[f"{name}_link"] = href if artifacts: - cert[f"{name}_hash"] = _get_hash(href).hex() + cert[f"{name}_hash"] = _get_hash(href) results.append(cert) return results @@ -1006,7 +1038,7 @@ def _get_netherlands_in_evaluation_old() -> list[dict[str, Any]]: soup = _get_page(constants.CC_NETHERLANDS_OLD_INEVAL_URL) table = soup.find("table") results = [] - for tr in table.find_all("tr")[1:]: + for tr in tqdm(table.find_all("tr")[1:], desc="Get NL scheme in evaluation (old)."): tds = tr.find_all("td") cert = { "developer": sns(tds[0].text), @@ -1023,7 +1055,7 @@ def _get_netherlands_in_evaluation_new() -> list[dict[str, Any]]: soup = _get_page(constants.CC_NETHERLANDS_NEW_INEVAL_URL) table = soup.find("table", class_="wpDataTable") results = [] - for tr in table.find_all("tr")[1:]: + for tr in tqdm(table.find_all("tr")[1:], desc="Get NL scheme in evaluation (new)."): tds = tr.find_all("td") cert = { "cert_id": sns(tds[0].text), @@ -1048,11 +1080,11 @@ def get_netherlands_in_evaluation() -> list[dict[str, Any]]: def _get_norway( # noqa: C901 - url: str, enhanced: bool, artifacts: bool + url: str, enhanced: bool, artifacts: bool, name ) -> list[dict[str, Any]]: soup = _get_page(url) results = [] - for tr in soup.find_all("tr", class_="certified-product"): + for tr in tqdm(soup.find_all("tr", class_="certified-product"), desc=f"Get NO scheme {name}."): tds = tr.find_all("td") cert: dict[str, Any] = { "product": sns(tds[0].text), @@ -1116,7 +1148,7 @@ def _get_norway( # noqa: C901 a = link.find("a") entry = {"href": urljoin(constants.CC_NORWAY_BASE_URL, a["href"])} if artifacts: - entry["hash"] = _get_hash(entry["href"]).hex() + entry["hash"] = _get_hash(entry["href"]) # type: ignore entries.append(entry) e["documents"][doc_type] = entries cert["enhanced"] = e @@ -1132,7 +1164,7 @@ def get_norway_certified(enhanced: bool = True, artifacts: bool = False) -> list :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_norway(constants.CC_NORWAY_CERTIFIED_URL, enhanced, artifacts) + return _get_norway(constants.CC_NORWAY_CERTIFIED_URL, enhanced, artifacts, "certified") def get_norway_archived(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]: @@ -1143,24 +1175,29 @@ def get_norway_archived(enhanced: bool = True, artifacts: bool = False) -> list[ :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_norway(constants.CC_NORWAY_ARCHIVED_URL, enhanced, artifacts) + return _get_norway(constants.CC_NORWAY_ARCHIVED_URL, enhanced, artifacts, "archived") def _get_korea( # noqa: C901 - product_class: int, enhanced: bool, artifacts: bool + product_class: int, enhanced: bool, artifacts: bool, name ) -> list[dict[str, Any]]: session = requests.session() - session.get(constants.CC_KOREA_EN_URL, verify=False) + _get_page(constants.CC_KOREA_EN_URL, session=session) # Get base page url = constants.CC_KOREA_CERTIFIED_URL + f"?product_class={product_class}" soup = _get_page(url, session=session) seen_pages = set() pages = {1} results = [] + pbar = tqdm(desc=f"Get KR scheme {name}.") while pages: page = pages.pop() csrf = soup.find("form", id="fm").find("input", attrs={"name": "csrf"})["value"] - resp = session.post(url, data={"csrf": csrf, "selectPage": page, "product_class": product_class}, verify=False) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=InsecureRequestWarning) + resp = session.post( + url, data={"csrf": csrf, "selectPage": page, "product_class": product_class}, verify=False + ) soup = BeautifulSoup(resp.content, "html5lib") tbody = soup.find("table", class_="cpl").find("tbody") for tr in tbody.find_all("tr"): @@ -1222,20 +1259,21 @@ def _get_korea( # noqa: C901 elif "Certificate" in title and a: v["cert_link"] = urljoin(constants.CC_KOREA_BASE_URL, a["href"]) if artifacts: - v["cert_hash"] = _get_hash(v["cert_link"], session).hex() + v["cert_hash"] = _get_hash(v["cert_link"], session) elif "Security Target" in title and a: v["target_link"] = urljoin(constants.CC_KOREA_BASE_URL, a["href"]) if artifacts: - v["target_hash"] = _get_hash(v["target_link"], session).hex() + v["target_hash"] = _get_hash(v["target_link"], session) elif "Certification Report" in title and a: v["report_link"] = urljoin(constants.CC_KOREA_BASE_URL, a["href"]) if artifacts: - v["report_hash"] = _get_hash(v["report_link"], session).hex() + v["report_hash"] = _get_hash(v["report_link"], session) elif "Maintenance Report" in title and a: v["maintenance_link"] = urljoin(constants.CC_KOREA_BASE_URL, a["href"]) if artifacts: - v["maintenance_hash"] = _get_hash(v["maintenance_link"], session).hex() + v["maintenance_hash"] = _get_hash(v["maintenance_link"], session) cert["enhanced"] = e + pbar.update() results.append(cert) seen_pages.add(page) page_links = soup.find("div", class_="paginate").find_all("a", class_="number_off") @@ -1246,6 +1284,7 @@ def _get_korea( # noqa: C901 pages.add(new_page) except Exception: pass + pbar.close() return results @@ -1257,7 +1296,7 @@ def get_korea_certified(enhanced: bool = True, artifacts: bool = False) -> list[ :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_korea(product_class=1, enhanced=enhanced, artifacts=artifacts) + return _get_korea(product_class=1, enhanced=enhanced, artifacts=artifacts, name="certified") def get_korea_suspended(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]: @@ -1268,7 +1307,7 @@ def get_korea_suspended(enhanced: bool = True, artifacts: bool = False) -> list[ :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_korea(product_class=2, enhanced=enhanced, artifacts=artifacts) + return _get_korea(product_class=2, enhanced=enhanced, artifacts=artifacts, name="suspended") def get_korea_archived(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]: @@ -1279,7 +1318,7 @@ def get_korea_archived(enhanced: bool = True, artifacts: bool = False) -> list[d :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_korea(product_class=4, enhanced=enhanced, artifacts=artifacts) + return _get_korea(product_class=4, enhanced=enhanced, artifacts=artifacts, name="archived") def get_poland_certified(artifacts: bool = False) -> list[dict[str, Any]]: @@ -1292,7 +1331,7 @@ def get_poland_certified(artifacts: bool = False) -> list[dict[str, Any]]: soup = _get_page(constants.CC_POLAND_CERTIFIED_URL) table = soup.find("table", class_="cert_tb") results = [] - for tr in table.find_all("tr")[1:]: + for tr in tqdm(table.find_all("tr")[1:], desc="Get PL scheme certified."): tds = tr.find_all("td") cert = { "client": sns(tds[0].text), @@ -1316,7 +1355,7 @@ def get_poland_certified(artifacts: bool = False) -> list[dict[str, Any]]: href = urljoin(constants.CC_POLAND_BASE_URL, a["href"]) cert[f"{name}_link"] = href if artifacts: - cert[f"{name}_hash"] = _get_hash(href).hex() + cert[f"{name}_hash"] = _get_hash(href) results.append(cert) return results @@ -1332,7 +1371,7 @@ def get_poland_ineval() -> list[dict[str, Any]]: soup = _get_page(constants.CC_POLAND_INEVAL_URL) table = soup.find("table", class_="cert_tb") results = [] - for tr in table.find_all("tr")[1:]: + for tr in tqdm(table.find_all("tr")[1:], desc="Get PL scheme in evaluation."): tds = tr.find_all("td") cert = { "client": sns(tds[0].text), @@ -1351,7 +1390,7 @@ def get_poland_ineval() -> list[dict[str, Any]]: return results -def _get_singapore(url: str, artifacts: bool) -> list[dict[str, Any]]: +def _get_singapore(url: str, artifacts: bool, name) -> list[dict[str, Any]]: soup = _get_page(url) page_id = str(soup.find("input", id="CurrentPageId").value) page = 1 @@ -1368,6 +1407,7 @@ def _get_singapore(url: str, artifacts: bool) -> list[dict[str, Any]]: api_json = api_call.json() total = api_json["total"] results: list[dict[str, Any]] = [] + pbar = tqdm(total=total, desc=f"Get SG scheme {name}.") while len(results) != total: for obj in api_json["objects"]: cert: dict[str, Any] = { @@ -1389,9 +1429,10 @@ def _get_singapore(url: str, artifacts: bool) -> list[dict[str, Any]]: "target_link": urljoin(constants.CC_SINGAPORE_BASE_URL, obj["securityTarget"]["mediaUrl"]), } if artifacts: - cert["cert_hash"] = _get_hash(cert["cert_link"]).hex() - cert["report_hash"] = _get_hash(cert["report_link"]).hex() - cert["target_hash"] = _get_hash(cert["target_link"]).hex() + cert["cert_hash"] = _get_hash(cert["cert_link"]) + cert["report_hash"] = _get_hash(cert["report_link"]) + cert["target_hash"] = _get_hash(cert["target_link"]) + pbar.update() results.append(cert) page += 1 api_call = requests.post( @@ -1405,6 +1446,7 @@ def _get_singapore(url: str, artifacts: bool) -> list[dict[str, Any]]: }, ) api_json = api_call.json() + pbar.close() return results @@ -1415,7 +1457,7 @@ def get_singapore_certified(artifacts: bool = False) -> list[dict[str, Any]]: :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_singapore(constants.CC_SINGAPORE_CERTIFIED_URL, artifacts) + return _get_singapore(constants.CC_SINGAPORE_CERTIFIED_URL, artifacts, "certified") def get_singapore_in_evaluation() -> list[dict[str, Any]]: @@ -1433,7 +1475,7 @@ def get_singapore_in_evaluation() -> list[dict[str, Any]]: else: raise ValueError("Cannot find table.") results = [] - for tr in table.find_all("tr")[1:]: + for tr in tqdm(table.find_all("tr")[1:], desc="Get SG scheme in evaluation."): tds = tr.find_all("td") cert = { "name": sns(tds[0].text), @@ -1451,7 +1493,7 @@ def get_singapore_archived(artifacts: bool = False) -> list[dict[str, Any]]: :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_singapore(constants.CC_SINGAPORE_ARCHIVED_URL, artifacts) + return _get_singapore(constants.CC_SINGAPORE_ARCHIVED_URL, artifacts, "archived") def get_spain_certified() -> list[dict[str, Any]]: @@ -1463,7 +1505,7 @@ def get_spain_certified() -> list[dict[str, Any]]: soup = _get_page(constants.CC_SPAIN_CERTIFIED_URL) tbody = soup.find("table", class_="djc_items_table").find("tbody") results = [] - for tr in tbody.find_all("tr", recursive=False): + for tr in tqdm(tbody.find_all("tr", recursive=False), desc="Get ES scheme certified."): tds = tr.find_all("td") cert = { "product": sns(tds[0].text), @@ -1477,12 +1519,12 @@ def get_spain_certified() -> list[dict[str, Any]]: def _get_sweden( # noqa: C901 - url: str, enhanced: bool, artifacts: bool + url: str, enhanced: bool, artifacts: bool, name ) -> list[dict[str, Any]]: soup = _get_page(url) nav = soup.find("main").find("nav", class_="component-nav-box__list") results = [] - for link in nav.find_all("a"): + for link in tqdm(nav.find_all("a"), desc=f"Get SE scheme {name}."): cert: dict[str, Any] = { "product": sns(link.text), "url": urljoin(constants.CC_SWEDEN_BASE_URL, link["href"]), @@ -1527,15 +1569,15 @@ def _get_sweden( # noqa: C901 elif "Security Target" in title and a: e["target_link"] = urljoin(constants.CC_SWEDEN_BASE_URL, a["href"]) if artifacts: - e["target_hash"] = _get_hash(e["target_link"]).hex() + e["target_hash"] = _get_hash(e["target_link"]) elif "Certifieringsrapport" in title and a: e["report_link"] = urljoin(constants.CC_SWEDEN_BASE_URL, a["href"]) if artifacts: - e["report_hash"] = _get_hash(e["report_hash"]).hex() + e["report_hash"] = _get_hash(e["report_hash"]) elif "Certifikat" in title and a: e["cert_link"] = urljoin(constants.CC_SWEDEN_BASE_URL, a["href"]) if artifacts: - e["cert_hash"] = _get_hash(e["cert_link"]).hex() + e["cert_hash"] = _get_hash(e["cert_link"]) cert["enhanced"] = e results.append(cert) return results @@ -1549,7 +1591,7 @@ def get_sweden_certified(enhanced: bool = True, artifacts: bool = False) -> list :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_sweden(constants.CC_SWEDEN_CERTIFIED_URL, enhanced, artifacts) + return _get_sweden(constants.CC_SWEDEN_CERTIFIED_URL, enhanced, artifacts, "certified") def get_sweden_in_evaluation(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]: @@ -1560,7 +1602,7 @@ def get_sweden_in_evaluation(enhanced: bool = True, artifacts: bool = False) -> :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_sweden(constants.CC_SWEDEN_INEVAL_URL, enhanced, artifacts) + return _get_sweden(constants.CC_SWEDEN_INEVAL_URL, enhanced, artifacts, "in evaluation") def get_sweden_archived(enhanced: bool = True, artifacts: bool = False) -> list[dict[str, Any]]: @@ -1571,7 +1613,7 @@ def get_sweden_archived(enhanced: bool = True, artifacts: bool = False) -> list[ :param artifacts: Whether to download and compute artifact hashes (way slower, even more data). :return: The entries. """ - return _get_sweden(constants.CC_SWEDEN_ARCHIVED_URL, enhanced, artifacts) + return _get_sweden(constants.CC_SWEDEN_ARCHIVED_URL, enhanced, artifacts, "archived") def get_turkey_certified() -> list[dict[str, Any]]: @@ -1588,7 +1630,7 @@ def get_turkey_certified() -> list[dict[str, Any]]: with pdf_path.open("wb") as f: f.write(resp.content) dfs = tabula.read_pdf(str(pdf_path), pages="all") - for df in dfs: + for df in tqdm(dfs, desc="Get TR scheme certified."): for line in df.values: # type: ignore values = [value if not (isinstance(value, float) and math.isnan(value)) else None for value in line] cert = { @@ -1608,7 +1650,7 @@ def get_turkey_certified() -> list[dict[str, Any]]: return results -def _get_usa(args, enhanced: bool, artifacts: bool): # noqa: C901 +def _get_usa(args, enhanced: bool, artifacts: bool, name): # noqa: C901 # TODO: There is more information in the API (like about PPs, etc.) def map_cert(cert, files=None): # noqa: C901 result = { @@ -1629,23 +1671,23 @@ def map_cert(cert, files=None): # noqa: C901 result["id"] += f"-{dt.year}" result["report_link"] = constants.CC_USA_GETFILE_URL + f"?file_id={file['file_id']}" if artifacts: - result["report_hash"] = _get_hash(result["report_link"]).hex() + result["report_hash"] = _get_hash(result["report_link"]) elif file["file_label"] == "CC Certificate": result["cert_link"] = constants.CC_USA_GETFILE_URL + f"?file_id={file['file_id']}" if artifacts: - result["cert_hash"] = _get_hash(result["cert_link"]).hex() + result["cert_hash"] = _get_hash(result["cert_link"]) elif file["file_label"] == "Security Target": result["target_link"] = constants.CC_USA_GETFILE_URL + f"?file_id={file['file_id']}" if artifacts: - result["target_hash"] = _get_hash(result["target_link"]).hex() + result["target_hash"] = _get_hash(result["target_link"]) elif file["file_label"] == "Assurance Activity Report (AAR)": result["aar_link"] = constants.CC_USA_GETFILE_URL + f"?file_id={file['file_id']}" if artifacts: - result["aar_hash"] = _get_hash(result["aar_link"]).hex() + result["aar_hash"] = _get_hash(result["aar_link"]) elif file["file_label"] == "Administrative Guide (AGD)": result["agd_link"] = constants.CC_USA_GETFILE_URL + f"?file_id={file['file_id']}" if artifacts: - result["agd_hash"] = _get_hash(result["agd_link"]).hex() + result["agd_hash"] = _get_hash(result["agd_link"]) return result @@ -1653,6 +1695,7 @@ def map_cert(cert, files=None): # noqa: C901 results = [] offset = 0 got = 0 + pbar = tqdm(desc=f"Get US scheme {name}.") while True: resp = _getq( constants.CC_USA_PRODUCTS_URL, @@ -1673,10 +1716,12 @@ def map_cert(cert, files=None): # noqa: C901 session, ) files = resp.json() + pbar.update() results.append(map_cert(cert, files)) offset += 100 if got >= count: break + pbar.close() return results @@ -1691,9 +1736,7 @@ def get_usa_certified( # noqa: C901 :return: The entries. """ return _get_usa( - {"certification_status": "Certified", "publish_status": "Published"}, - enhanced, - artifacts, + {"certification_status": "Certified", "publish_status": "Published"}, enhanced, artifacts, "certified" ) @@ -1703,7 +1746,7 @@ def get_usa_in_evaluation() -> list[dict[str, Any]]: :return: The entries. """ - return _get_usa({"status": "In Progress", "publish_status": "Published"}, False, False) + return _get_usa({"status": "In Progress", "publish_status": "Published"}, False, False, "in evaluation") def get_usa_archived() -> list[dict[str, Any]]: @@ -1712,7 +1755,7 @@ def get_usa_archived() -> list[dict[str, Any]]: :return: The entries. """ - return _get_usa({"status": "Archived", "publish_status": "Published"}, False, False) + return _get_usa({"status": "Archived", "publish_status": "Published"}, False, False, "archived") class EntryType(Enum): @@ -1817,7 +1860,9 @@ def to_dict(self): } @classmethod - def from_web(cls, scheme: str, entry_types: Iterable[EntryType]) -> CCScheme: + def from_web( + cls, scheme: str, entry_types: Iterable[EntryType], enhanced: bool | None = None, artifacts: bool | None = None + ) -> CCScheme: if not (scheme_lists := cls.methods.get(scheme)): raise ValueError("Unknown scheme.") entries = {} @@ -1825,5 +1870,11 @@ def from_web(cls, scheme: str, entry_types: Iterable[EntryType]) -> CCScheme: for each_type in entry_types: if not (method := scheme_lists.get(each_type)): raise ValueError("Wrong entry_type for scheme.") - entries[each_type] = method() + sig = signature(method) + args = {} + if enhanced is not None and "enhanced" in sig.parameters: + args["enhanced"] = enhanced + if artifacts is not None and "artifacts" in sig.parameters: + args["artifacts"] = artifacts + entries[each_type] = method(**args) return cls(scheme, timestamp, entries)