diff --git a/README.md b/README.md index 258725c2..41529529 100644 --- a/README.md +++ b/README.md @@ -68,8 +68,8 @@ df_2015_and_newer = df.loc[df.year_from > 2014] df.year_from.value_counts().sort_index().plot.line() ``` - +![](docs/_static/logolink_OP_VVV_hor_barva_eng.jpg) diff --git a/notebooks/fips/vulnerabilities.ipynb b/notebooks/fips/vulnerabilities.ipynb index d00cccde..e938a747 100644 --- a/notebooks/fips/vulnerabilities.ipynb +++ b/notebooks/fips/vulnerabilities.ipynb @@ -2,9 +2,14 @@ "cells": [ { "cell_type": "code", - "execution_count": 17, + "execution_count": 1, "id": "41674b9c", - "metadata": {}, + "metadata": { + "ExecuteTime": { + "end_time": "2024-10-17T09:37:51.724995Z", + "start_time": "2024-10-17T09:37:51.033775Z" + } + }, "outputs": [], "source": [ "from sec_certs.dataset.fips import FIPSDataset\n", @@ -21,7 +26,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 3, "id": "5ee5dca5", "metadata": {}, "outputs": [ @@ -29,18 +34,35 @@ "name": "stderr", "output_type": "stream", "text": [ - "Downloading FIPS dataset: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 107M/107M [00:12<00:00, 8.88MB/s]\n", - "100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 21/21 [00:10<00:00, 1.95it/s]\n", - "Building CVEDataset from jsons: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 21/21 [00:15<00:00, 1.37it/s]\n", - "parsing cpe matching (by NIST) dictionary: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 400508/400508 [00:26<00:00, 15291.66it/s]\n", - "Building-up lookup dictionaries for fast CVE matching: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████| 198239/198239 [00:09<00:00, 20313.13it/s]\n" + "Downloading FIPS Dataset: 61.7MB [00:12, 5.09MB/s]\n" ] } ], "source": [ "dset = FIPSDataset.from_web_latest()\n", - "cve_dset: CVEDataset = dset._prepare_cve_dataset()\n", - "cpe_dset: CPEDataset = dset._prepare_cpe_dataset()" + "#cve_dset: CVEDataset = dset._prepare_cve_dataset()\n", + "#cpe_dset: CPEDataset = dset._prepare_cpe_dataset()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "303824ee-a101-492d-8505-3e1f96a04d69", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "PosixPath('/this/is/dummy/nonexisting/path/auxiliary_datasets/cpe_dataset.json')" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dset.cpe_dataset_path" ] }, { @@ -568,7 +590,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3.8.13 ('venv': venv)", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -582,7 +604,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.13" + "version": "3.12.6" }, "vscode": { "interpreter": { diff --git a/src/sec_certs/configuration.py b/src/sec_certs/configuration.py index 47ac5821..0305bdea 100644 --- a/src/sec_certs/configuration.py +++ b/src/sec_certs/configuration.py @@ -31,7 +31,9 @@ class Configuration(BaseSettings): description=" During validation we don't connect certificates with validation dates difference higher than _this_.", ) n_threads: int = Field( - -1, description="How many threads to use for parallel computations. Set to -1 to use all logical cores.", ge=-1 + -1, + description="How many threads to use for parallel computations. Set to -1 to use all logical cores.", + ge=-1, ) cpe_matching_threshold: int = Field( 92, @@ -40,12 +42,18 @@ class Configuration(BaseSettings): le=100, ) cpe_n_max_matches: int = Field( - 99, description="Maximum number of candidate CPE items that may be related to given certificate, >0", gt=0 + 99, + description="Maximum number of candidate CPE items that may be related to given certificate, >0", + gt=0, ) cc_latest_snapshot: AnyHttpUrl = Field( "https://sec-certs.org/cc/dataset.json", description="URL from where to fetch the latest snapshot of fully processed CC dataset.", ) + cc_latest_full_archive: AnyHttpUrl = Field( + "https://sec-certs.org/cc/cc.tar.gz", + description="URL from where to fetch the latest full archive of fully processed CC dataset.", + ) cc_maintenances_latest_snapshot: AnyHttpUrl = Field( "https://sec-certs.org/cc/maintenance_updates.json", description="URL from where to fetch the latest snapshot of CC maintenance updates", @@ -55,25 +63,36 @@ class Configuration(BaseSettings): description="URL from where to fetch the latest snapshot of the PP dataset.", ) fips_latest_snapshot: AnyHttpUrl = Field( - "https://sec-certs.org/fips/dataset.json", description="URL for the latest snapshot of FIPS dataset." + "https://sec-certs.org/fips/dataset.json", + description="URL for the latest snapshot of FIPS dataset.", + ) + fips_latest_full_archive: AnyHttpUrl = Field( + "https://sec-certs.org/fips/fips.tar.gz", + description="URL from where to fetch the latest full archive of fully processed FIPS dataset.", ) fips_iut_dataset: AnyHttpUrl = Field( - "https://sec-certs.org/fips/iut/dataset.json", description="URL for the dataset of FIPS IUT data." + "https://sec-certs.org/fips/iut/dataset.json", + description="URL for the dataset of FIPS IUT data.", ) fips_iut_latest_snapshot: AnyHttpUrl = Field( - "https://sec-certs.org/fips/iut/latest.json", description="URL for the latest snapshot of FIPS IUT data." + "https://sec-certs.org/fips/iut/latest.json", + description="URL for the latest snapshot of FIPS IUT data.", ) fips_mip_dataset: AnyHttpUrl = Field( - "https://sec-certs.org/fips/mip/dataset.json", description="URL for the dataset of FIPS MIP data" + "https://sec-certs.org/fips/mip/dataset.json", + description="URL for the dataset of FIPS MIP data", ) fips_mip_latest_snapshot: AnyHttpUrl = Field( - "https://sec-certs.org/fips/mip/latest.json", description="URL for the latest snapshot of FIPS MIP data" + "https://sec-certs.org/fips/mip/latest.json", + description="URL for the latest snapshot of FIPS MIP data", ) cpe_latest_snapshot: AnyHttpUrl = Field( - "https://sec-certs.org/vuln/cpe/cpe.json.gz", description="URL for the latest snapshot of CPEDataset." + "https://sec-certs.org/vuln/cpe/cpe.json.gz", + description="URL for the latest snapshot of CPEDataset.", ) cve_latest_snapshot: AnyHttpUrl = Field( - "https://sec-certs.org/vuln/cve/cve.json.gz", description="URL for the latest snapshot of CVEDataset." + "https://sec-certs.org/vuln/cve/cve.json.gz", + description="URL for the latest snapshot of CVEDataset.", ) cpe_match_latest_snapshot: AnyHttpUrl = Field( "https://sec-certs.org/vuln/cpe/cpe_match.json.gz", @@ -91,14 +110,16 @@ class Configuration(BaseSettings): ge=0, ) ignore_first_page: bool = Field( - True, description="During keyword search, first page usually contains addresses - ignore it." + True, + description="During keyword search, first page usually contains addresses - ignore it.", ) cc_reference_annotator_dir: Optional[Path] = Field( # noqa: UP007 None, description="Path to directory with serialized reference annotator model. If set to `null`, tool will search default directory for the given dataset.", ) cc_reference_annotator_should_train: bool = Field( - True, description="True if new reference annotator model shall be build, False otherwise." + True, + description="True if new reference annotator model shall be build, False otherwise.", ) cc_matching_threshold: int = Field( 90, @@ -109,14 +130,15 @@ class Configuration(BaseSettings): cc_use_proxy: bool = Field(False, description="Download CC artifacts through the sec-certs.org proxy.") fips_use_proxy: bool = Field(False, description="Download FIPS artifacts through the sec-certs.org proxy.") enable_progress_bars: bool = Field( - True, description="If true, progress bars will be printed to stdout during computation." + True, + description="If true, progress bars will be printed to stdout during computation.", ) nvd_api_key: Optional[str] = Field(None, description="NVD API key for access to CVEs and CPEs.") # noqa: UP007 preferred_source_nvd_datasets: Literal["sec-certs", "api"] = Field( "sec-certs", description="If set to `sec-certs`, will fetch CPE and CVE datasets from sec-certs.org." + " If set to `api`, will fetch these resources from NVD API. It is advised to set an" - + " `nvd_api_key` when setting this to `nvd`.", + + " `nvd_api_key` when setting this to `api`.", ) def _get_nondefault_keys(self) -> set[str]: diff --git a/src/sec_certs/constants.py b/src/sec_certs/constants.py index 956f8fb7..b1c25fe5 100644 --- a/src/sec_certs/constants.py +++ b/src/sec_certs/constants.py @@ -7,6 +7,7 @@ REF_EMBEDDING_METHOD = Literal["tf_idf", "transformer"] +# This stupid thing should die in a fire... DUMMY_NONEXISTING_PATH = Path("/this/is/dummy/nonexisting/path") RESPONSE_OK = 200 diff --git a/src/sec_certs/dataset/cc.py b/src/sec_certs/dataset/cc.py index ebffd912..f5498745 100644 --- a/src/sec_certs/dataset/cc.py +++ b/src/sec_certs/dataset/cc.py @@ -80,13 +80,21 @@ def to_pandas(self) -> pd.DataFrame: """ Return self serialized into pandas DataFrame """ - df = pd.DataFrame([x.pandas_tuple for x in self.certs.values()], columns=CCCertificate.pandas_columns) + df = pd.DataFrame( + [x.pandas_tuple for x in self.certs.values()], + columns=CCCertificate.pandas_columns, + ) df = df.set_index("dgst") df.not_valid_before = pd.to_datetime(df.not_valid_before, errors="coerce") df.not_valid_after = pd.to_datetime(df.not_valid_after, errors="coerce") df = df.astype( - {"category": "category", "status": "category", "scheme": "category", "cert_lab": "category"} + { + "category": "category", + "status": "category", + "scheme": "category", + "cert_lab": "category", + } ).fillna(value=np.nan) df = df.loc[ ~df.manufacturer.isnull() @@ -212,7 +220,10 @@ def scheme_dataset_path(self) -> Path: "cc_pp_collaborative.html": BASE_URL + "/pps/collaborativePP.cfm?cpp=1", "cc_pp_archived.html": BASE_URL + "/pps/index.cfm?archived=1", } - PP_CSV = {"cc_pp_active.csv": BASE_URL + "/pps/pps.csv", "cc_pp_archived.csv": BASE_URL + "/pps/pps-archived.csv"} + PP_CSV = { + "cc_pp_active.csv": BASE_URL + "/pps/pps.csv", + "cc_pp_archived.csv": BASE_URL + "/pps/pps-archived.csv", + } @property def active_html_tuples(self) -> list[tuple[str, Path]]: @@ -247,11 +258,33 @@ def archived_csv_tuples(self) -> list[tuple[str, Path]]: return [(x, self.web_dir / y) for y, x in self.CSV_PRODUCTS_URL.items() if "archived" in y] @classmethod - def from_web_latest(cls) -> CCDataset: + def from_web_latest( + cls, + path: str | Path | None = None, + auxiliary_datasets: bool = False, + artifacts: bool = False, + ) -> CCDataset: """ - Fetches the fresh snapshot of CCDataset from sec-certs.org + Fetches the fresh snapshot of CCDataset from sec-certs.org. + + Optionally stores it at the given path (a directory) and also downloads auxiliary datasets and artifacts (PDFs). + + :::{note} + Note that including the auxiliary datasets adds several gigabytes and including artifacts adds tens of gigabytes. + ::: + + :param path: Path to a directory where to store the dataset, or `None` if it should not be stored. + :param auxiliary_datasets: Whether to also download auxiliary datasets (CVE, CPE, CPEMatch datasets). + :param artifacts: Whether to also download artifacts (i.e. PDFs). """ - return cls.from_web(config.cc_latest_snapshot, "Downloading CC Dataset", "cc_latest_dataset.json") + return cls.from_web( + config.cc_latest_full_archive, + config.cc_latest_snapshot, + "Downloading CC", + path, + auxiliary_datasets, + artifacts, + ) def _set_local_paths(self): super()._set_local_paths() @@ -262,6 +295,9 @@ def _set_local_paths(self): if self.auxiliary_datasets.mu_dset: self.auxiliary_datasets.mu_dset.root_dir = self.mu_dataset_dir + if self.auxiliary_datasets.scheme_dset: + self.auxiliary_datasets.scheme_dset.json_path = self.scheme_dataset_path + for cert in self: cert.set_local_paths( self.reports_pdf_dir, @@ -271,7 +307,6 @@ def _set_local_paths(self): self.targets_txt_dir, self.certificates_txt_dir, ) - # TODO: This forgets to set local paths for other auxiliary datasets def _merge_certs(self, certs: dict[str, CCCertificate], cert_source: str | None = None) -> None: """ @@ -308,7 +343,11 @@ def _download_csv_html_resources(self, get_active: bool = True, get_archived: bo @serialize @staged(logger, "Downloading and processing CSV and HTML files of certificates.") def get_certs_from_web( - self, to_download: bool = True, keep_metadata: bool = True, get_active: bool = True, get_archived: bool = True + self, + to_download: bool = True, + keep_metadata: bool = True, + get_active: bool = True, + get_archived: bool = True, ) -> None: """ Downloads CSV and HTML files that hold lists of certificates from common criteria website. Parses these files @@ -410,7 +449,10 @@ def _get_primary_key_str(row: Tag): ["not_valid_before", "not_valid_after", "maintenance_date"] ].apply(pd.to_datetime, errors="coerce") - df["dgst"] = df.apply(lambda row: helpers.get_first_16_bytes_sha256(_get_primary_key_str(row)), axis=1) + df["dgst"] = df.apply( + lambda row: helpers.get_first_16_bytes_sha256(_get_primary_key_str(row)), + axis=1, + ) df_base = df.loc[~df.is_maintenance].copy() df_main = df.loc[df.is_maintenance].copy() @@ -444,7 +486,10 @@ def _get_primary_key_str(row: Tag): for x in df_main.itertuples(): updates[x.dgst].add( CCCertificate.MaintenanceReport( - x.maintenance_date.date(), x.maintenance_title, x.maintenance_report_link, x.maintenance_st_link + x.maintenance_date.date(), + x.maintenance_title, + x.maintenance_report_link, + x.maintenance_st_link, ) ) @@ -538,7 +583,22 @@ def _parse_table( cert_status = "active" if "active" in str(file) else "archived" - cc_cat_abbreviations = ["AC", "BP", "DP", "DB", "DD", "IC", "KM", "MD", "MF", "NS", "OS", "OD", "DG", "TC"] + cc_cat_abbreviations = [ + "AC", + "BP", + "DP", + "DB", + "DD", + "IC", + "KM", + "MD", + "MF", + "NS", + "OS", + "OD", + "DG", + "TC", + ] cc_table_ids = ["tbl" + x for x in cc_cat_abbreviations] cc_categories = [ "Access Control Devices and Systems", @@ -774,18 +834,27 @@ def extract_data(self) -> None: self._extract_pdf_frontpage() self._extract_pdf_keywords() - @staged(logger, "Computing heuristics: Deriving information about laboratories involved in certification.") + @staged( + logger, + "Computing heuristics: Deriving information about laboratories involved in certification.", + ) def _compute_cert_labs(self) -> None: certs_to_process = [x for x in self if x.state.report.is_ok_to_analyze()] for cert in certs_to_process: cert.compute_heuristics_cert_lab() - @staged(logger, "Computing heuristics: Deriving information about certificate ids from artifacts.") + @staged( + logger, + "Computing heuristics: Deriving information about certificate ids from artifacts.", + ) def _compute_normalized_cert_ids(self) -> None: for cert in self: cert.compute_heuristics_cert_id() - @staged(logger, "Computing heuristics: Transitive vulnerabilities in referenc(ed/ing) certificates.") + @staged( + logger, + "Computing heuristics: Transitive vulnerabilities in referenc(ed/ing) certificates.", + ) def _compute_transitive_vulnerabilities(self): transitive_cve_finder = TransitiveVulnerabilityFinder(lambda cert: cert.heuristics.cert_id) transitive_cve_finder.fit(self.certs, lambda cert: cert.heuristics.report_references) @@ -851,7 +920,11 @@ def func(cert): finder.fit(self.certs, lambda cert: cert.heuristics.cert_id, ref_lookup(kw_source)) # type: ignore for dgst in self.certs: - setattr(self.certs[dgst].heuristics, dep_attr, finder.predict_single_cert(dgst, keep_unknowns=False)) + setattr( + self.certs[dgst].heuristics, + dep_attr, + finder.predict_single_cert(dgst, keep_unknowns=False), + ) @serialize def process_auxiliary_datasets(self, download_fresh: bool = False) -> None: @@ -910,7 +983,9 @@ def process_maintenance_updates(self, to_download: bool = True) -> CCDatasetMain itertools.chain.from_iterable(CCMaintenanceUpdate.get_updates_from_cc_cert(x) for x in maintained_certs) ) update_dset = CCDatasetMaintenanceUpdates( - {x.dgst: x for x in updates}, root_dir=self.mu_dataset_dir, name="maintenance_updates" + {x.dgst: x for x in updates}, + root_dir=self.mu_dataset_dir, + name="maintenance_updates", ) else: update_dset = CCDatasetMaintenanceUpdates.from_json(self.mu_dataset_path) @@ -978,18 +1053,28 @@ def analyze_certificates(self) -> None: raise NotImplementedError def get_certs_from_web( - self, to_download: bool = True, keep_metadata: bool = True, get_active: bool = True, get_archived: bool = True + self, + to_download: bool = True, + keep_metadata: bool = True, + get_active: bool = True, + get_archived: bool = True, ) -> None: raise NotImplementedError @classmethod def from_json(cls, input_path: str | Path, is_compressed: bool = False) -> CCDatasetMaintenanceUpdates: - dset = cast(CCDatasetMaintenanceUpdates, ComplexSerializableType.from_json(input_path, is_compressed)) + dset = cast( + CCDatasetMaintenanceUpdates, + ComplexSerializableType.from_json(input_path, is_compressed), + ) dset._root_dir = Path(input_path).parent.absolute() return dset def to_pandas(self) -> pd.DataFrame: - df = pd.DataFrame([x.pandas_tuple for x in self.certs.values()], columns=CCMaintenanceUpdate.pandas_columns) + df = pd.DataFrame( + [x.pandas_tuple for x in self.certs.values()], + columns=CCMaintenanceUpdate.pandas_columns, + ) df = df.set_index("dgst") df.index.name = "dgst" @@ -997,11 +1082,29 @@ def to_pandas(self) -> pd.DataFrame: return df.fillna(value=np.nan) @classmethod - def from_web_latest(cls) -> CCDatasetMaintenanceUpdates: + def from_web_latest( + cls, + path: str | Path | None = None, + auxiliary_datasets: bool = False, + artifacts: bool = False, + ) -> CCDatasetMaintenanceUpdates: + if auxiliary_datasets or artifacts: + raise ValueError( + "Maintenance update dataset does not support downloading artifacts or other auxiliary datasets." + ) + if path: + path = Path(path) + if not path.exists(): + path.mkdir(parents=True) + if not path.is_dir(): + raise ValueError("Path needs to be a directory.") with tempfile.TemporaryDirectory() as tmp_dir: - dset_path = Path(tmp_dir) / "cc_maintenances_latest_dataset.json" + dset_path = Path(tmp_dir) / "maintenance_updates.json" helpers.download_file(config.cc_maintenances_latest_snapshot, dset_path) - return cls.from_json(dset_path) + dset = cls.from_json(dset_path) + if path: + dset.move_dataset(path) + return dset def get_n_maintenances_df(self) -> pd.DataFrame: """ diff --git a/src/sec_certs/dataset/dataset.py b/src/sec_certs/dataset/dataset.py index 218936c4..2da2bdc4 100644 --- a/src/sec_certs/dataset/dataset.py +++ b/src/sec_certs/dataset/dataset.py @@ -6,6 +6,7 @@ import logging import re import shutil +import tarfile import tempfile from abc import ABC, abstractmethod from collections.abc import Iterator @@ -23,9 +24,17 @@ from sec_certs.model.cpe_matching import CPEClassifier from sec_certs.sample.certificate import Certificate from sec_certs.sample.cpe import CPE -from sec_certs.serialization.json import ComplexSerializableType, get_class_fullname, serialize +from sec_certs.serialization.json import ( + ComplexSerializableType, + get_class_fullname, + serialize, +) from sec_certs.utils import helpers -from sec_certs.utils.nvd_dataset_builder import CpeMatchNvdDatasetBuilder, CpeNvdDatasetBuilder, CveNvdDatasetBuilder +from sec_certs.utils.nvd_dataset_builder import ( + CpeMatchNvdDatasetBuilder, + CpeNvdDatasetBuilder, + CveNvdDatasetBuilder, +) from sec_certs.utils.profiling import staged from sec_certs.utils.tqdm import tqdm @@ -170,16 +179,74 @@ def __str__(self) -> str: return str(type(self).__name__) + ":" + self.name + ", " + str(len(self)) + " certificates" @classmethod - def from_web(cls: type[DatasetSubType], url: str, progress_bar_desc: str, filename: str) -> DatasetSubType: - """ - Fetches a fully processed dataset instance from static site that hosts it. - """ - with tempfile.TemporaryDirectory() as tmp_dir: - dset_path = Path(tmp_dir) / filename - helpers.download_file(url, dset_path, show_progress_bar=True, progress_bar_desc=progress_bar_desc) - dset = cls.from_json(dset_path) - dset.root_dir = constants.DUMMY_NONEXISTING_PATH - return dset + def from_web( # noqa + cls: type[DatasetSubType], + archive_url: str, + snapshot_url: str, + progress_bar_desc: str, + path: None | str | Path = None, + auxiliary_datasets: bool = False, + artifacts: bool = False, + ) -> DatasetSubType: + """ + Fetches the fresh dataset snapshot from sec-certs.org. + + Optionally stores it at the given path (a directory) and also downloads auxiliary datasets and artifacts (PDFs). + + :::{note} + Note that including the auxiliary datasets adds several gigabytes and including artifacts adds tens of gigabytes. + ::: + + :param archive_url: The URL of the full dataset archive. + :param snapshot_url: The URL of the full dataset snapshot. + :param progress_bar_desc: Description of the download progress bar. + :param path: Path to a directory where to store the dataset, or `None` if it should not be stored. + :param auxiliary_datasets: Whether to also download auxiliary datasets (CVE, CPE, CPEMatch datasets). + :param artifacts: Whether to also download artifacts (i.e. PDFs). + """ + if (artifacts or auxiliary_datasets) and path is None: + raise ValueError("Path needs to be defined if artifacts or auxiliary datasets are to be downloaded.") + if artifacts and not auxiliary_datasets: + raise ValueError("Auxiliary datasets need to be downloaded if artifacts are to be downloaded.") + if path is not None: + path = Path(path) + if not path.exists(): + path.mkdir(parents=True) + if not path.is_dir(): + raise ValueError("Path needs to be a directory.") + if artifacts: + with tempfile.TemporaryDirectory() as tmp_dir: + dset_path = Path(tmp_dir) / "dataset.tar.gz" + res = helpers.download_file( + archive_url, + dset_path, + show_progress_bar=True, + progress_bar_desc=progress_bar_desc, + ) + if res != constants.RESPONSE_OK: + raise ValueError(f"Download failed: {res}") + with tarfile.open(dset_path, "r:gz") as tar: + tar.extractall(str(path)) + dset = cls.from_json(path / "dataset.json") # type: ignore + if auxiliary_datasets: + dset.process_auxiliary_datasets(download_fresh=False) + else: + with tempfile.TemporaryDirectory() as tmp_dir: + dset_path = Path(tmp_dir) / "dataset.json" + helpers.download_file( + snapshot_url, + dset_path, + show_progress_bar=True, + progress_bar_desc=progress_bar_desc, + ) + dset = cls.from_json(dset_path) + if path: + dset.move_dataset(path) + else: + dset.root_dir = constants.DUMMY_NONEXISTING_PATH + if auxiliary_datasets: + dset.process_auxiliary_datasets(download_fresh=True) + return dset def to_dict(self) -> dict[str, Any]: return { @@ -204,7 +271,10 @@ def from_dict(cls: type[DatasetSubType], dct: dict) -> DatasetSubType: @classmethod def from_json(cls: type[DatasetSubType], input_path: str | Path, is_compressed: bool = False) -> DatasetSubType: - dset = cast("DatasetSubType", ComplexSerializableType.from_json(input_path, is_compressed)) + dset = cast( + "DatasetSubType", + ComplexSerializableType.from_json(input_path, is_compressed), + ) dset._root_dir = Path(input_path).parent.absolute() dset._set_local_paths() return dset @@ -411,7 +481,7 @@ def _prepare_cpe_match_dict(self, download_fresh: bool = False) -> dict: if download_fresh: if config.preferred_source_nvd_datasets == "api": - logger.info("Fetchnig CPE Match feed from NVD APi.") + logger.info("Fetching CPE Match feed from NVD APi.") with CpeMatchNvdDatasetBuilder(api_key=config.nvd_api_key) as builder: cpe_match_dict = builder.build_dataset(cpe_match_dict) else: @@ -444,8 +514,16 @@ def compute_cpe_heuristics(self) -> CPEClassifier: Computes matching CPEs for the certificates. """ WINDOWS_WEAK_CPES: set[CPE] = { - CPE("", "cpe:2.3:o:microsoft:windows:-:*:*:*:*:*:x64:*", "Microsoft Windows on X64"), - CPE("", "cpe:2.3:o:microsoft:windows:-:*:*:*:*:*:x86:*", "Microsoft Windows on X86"), + CPE( + "", + "cpe:2.3:o:microsoft:windows:-:*:*:*:*:*:x64:*", + "Microsoft Windows on X64", + ), + CPE( + "", + "cpe:2.3:o:microsoft:windows:-:*:*:*:*:*:x86:*", + "Microsoft Windows on X86", + ), } def filter_condition(cpe: CPE) -> bool: diff --git a/src/sec_certs/dataset/fips.py b/src/sec_certs/dataset/fips.py index 77f38754..eeaec0a0 100644 --- a/src/sec_certs/dataset/fips.py +++ b/src/sec_certs/dataset/fips.py @@ -18,7 +18,9 @@ from sec_certs.dataset.dataset import AuxiliaryDatasets, Dataset from sec_certs.dataset.fips_algorithm import FIPSAlgorithmDataset from sec_certs.model.reference_finder import ReferenceFinder -from sec_certs.model.transitive_vulnerability_finder import TransitiveVulnerabilityFinder +from sec_certs.model.transitive_vulnerability_finder import ( + TransitiveVulnerabilityFinder, +) from sec_certs.sample.fips import FIPSCertificate from sec_certs.serialization.json import ComplexSerializableType, serialize from sec_certs.utils import helpers @@ -215,11 +217,33 @@ def _get_certificates_from_html(self, html_file: Path) -> list[FIPSCertificate]: return [FIPSCertificate(int(cert_id)) for cert_id in cert_ids] @classmethod - def from_web_latest(cls) -> FIPSDataset: + def from_web_latest( + cls, + path: str | Path | None = None, + auxiliary_datasets: bool = False, + artifacts: bool = False, + ) -> FIPSDataset: """ - Fetches the fresh snapshot of FIPSDataset from mirror. + Fetches the fresh snapshot of FIPSDataset from sec-certs.org. + + Optionally stores it at the given path (a directory) and also downloads auxiliary datasets and artifacts (PDFs). + + :::{note} + Note that including the auxiliary datasets adds several gigabytes and including artifacts adds tens of gigabytes. + ::: + + :param path: Path to a directory where to store the dataset, or `None` if it should not be stored. + :param auxiliary_datasets: Whether to also download auxiliary datasets (CVE, CPE, CPEMatch datasets). + :param artifacts: Whether to also download artifacts (i.e. PDFs). """ - return cls.from_web(config.fips_latest_snapshot, "Downloading FIPS Dataset", "fips_latest_dataset.json") + return cls.from_web( + config.fips_latest_full_archive, + config.fips_latest_snapshot, + "Downloading FIPS", + path, + auxiliary_datasets, + artifacts, + ) def _set_local_paths(self) -> None: super()._set_local_paths() @@ -283,7 +307,10 @@ def _extract_policy_pdf_metadata(self) -> None: ) self.update_with_certs(processed_certs) - @staged(logger, "Computing heuristics: Transitive vulnerabilities in referenc(ed/ing) certificates.") + @staged( + logger, + "Computing heuristics: Transitive vulnerabilities in referenc(ed/ing) certificates.", + ) def _compute_transitive_vulnerabilities(self) -> None: transitive_cve_finder = TransitiveVulnerabilityFinder(lambda cert: str(cert.cert_id)) transitive_cve_finder.fit(self.certs, lambda cert: cert.heuristics.policy_processed_references) @@ -306,12 +333,16 @@ def _compute_references(self, keep_unknowns: bool = False) -> None: policy_reference_finder = ReferenceFinder() policy_reference_finder.fit( - self.certs, lambda cert: str(cert.cert_id), lambda cert: cert.heuristics.policy_prunned_references + self.certs, + lambda cert: str(cert.cert_id), + lambda cert: cert.heuristics.policy_prunned_references, ) module_reference_finder = ReferenceFinder() module_reference_finder.fit( - self.certs, lambda cert: str(cert.cert_id), lambda cert: cert.heuristics.module_prunned_references + self.certs, + lambda cert: str(cert.cert_id), + lambda cert: cert.heuristics.module_prunned_references, ) for cert in self: @@ -323,7 +354,10 @@ def _compute_references(self, keep_unknowns: bool = False) -> None: ) def to_pandas(self) -> pd.DataFrame: - df = pd.DataFrame([x.pandas_tuple for x in self.certs.values()], columns=FIPSCertificate.pandas_columns) + df = pd.DataFrame( + [x.pandas_tuple for x in self.certs.values()], + columns=FIPSCertificate.pandas_columns, + ) df = df.set_index("dgst") df.date_validation = pd.to_datetime(df.date_validation, errors="coerce") @@ -333,7 +367,12 @@ def to_pandas(self) -> pd.DataFrame: df = df.loc[~(df.embodiment == "*")] df = df.astype( - {"type": "category", "status": "category", "standard": "category", "embodiment": "category"} + { + "type": "category", + "status": "category", + "standard": "category", + "embodiment": "category", + } ).fillna(value=np.nan) df.level = df.level.fillna(value=np.nan).astype("float")