Skip to content

Commit

Permalink
Update auto_skip_checkov.py
Browse files Browse the repository at this point in the history
Signed-off-by: ҉αkα x⠠⠵ <[email protected]>
  • Loading branch information
4k4xs4pH1r3 authored Dec 13, 2024
1 parent 5ad196a commit 60e0e44
Showing 1 changed file with 195 additions and 121 deletions.
316 changes: 195 additions & 121 deletions DevSecOps/auto_skip_checkov.py
Original file line number Diff line number Diff line change
@@ -1,141 +1,215 @@
"""Add Checkov skip comments."""

"""Auto-add skip comments based on Checkov findings."""
import glob
import logging
from typing import Dict, Set
import tempfile
from typing import Dict, List, Optional, Tuple, cast

from tqdm import tqdm # type: ignore

# Configure logging.
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s")

# Global aliases and variables.
PROCESSED_FINDINGS: Dict[str, Set[str]] = {}
FAILED_RESOURCE_STR = "FAILED for resource"
CHECK_STR = "Check:"
FILE_STR = " File: "


def add_checkov_skip(filename: str, checkov_id: str, lineno: int):
"""Add skip comment if it doesn't already exist in the file."""
# Logging setup
with tempfile.NamedTemporaryFile(
mode="w", delete=False
) as tmp_file:
LOG_FILE = tmp_file.name
logging.basicConfig(
filename=LOG_FILE,
filemode="w",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)

# Constants
FRS = "FAILED for resource"
DATES: Dict[str, str] = {
"python3.7": "2023-06-05",
"python3.6": "2021-12-23",
}
SKIP_COMMENT_PREFIX = "# checkov:skip="


def remove_duplicate_skips(lines: List[str]) -> List[str]:
"""Remove duplicate skip comments."""
return [
line
for i, line in enumerate(lines)
if not (
line.startswith(SKIP_COMMENT_PREFIX)
and i > 0
and lines[i - 1].startswith(SKIP_COMMENT_PREFIX)
and line == lines[i - 1]
)
]


def add_skip_comment(
filename: str, checkov_id: str, start_line: int
) -> None:
"""Add skip comment to file."""
try:
with open(filename, "r", encoding="utf-8") as f:
file_content = f.read()

if f"# checkov:skip={checkov_id}" not in file_content:
with open(filename, "r", encoding="utf-8") as f:
lines = f.readlines()
lineno = max(0, min(lineno, len(lines) - 1))
lines[lineno] = f"# checkov:skip={checkov_id}\n" + lines[lineno]
logging.info("Updated: %s - line %s", filename, lineno + 1)
with open(filename, "w", encoding="utf-8") as f:
f.writelines(lines)
else:
logging.info("Skip comment already exists in %s for %s", filename,
checkov_id)
except OSError as e:
logging.exception("Error adding Checkov skip to %s: %s", filename, e)
with open(filename, "r+", encoding="utf-8") as file:
lines = file.readlines()
skip_comment = f"{SKIP_COMMENT_PREFIX}{checkov_id}\n"
if skip_comment not in lines[
start_line: start_line + 1
]:
lines.insert(start_line, skip_comment)
logging.info(
"Updated: %s - line %s",
filename,
start_line + 1
)
file.seek(0)
file.writelines(lines)
file.truncate()
else:
logging.info(
"Skip exists: %s - %s",
filename,
checkov_id
)
except (OSError, IndexError) as error:
logging.exception(
"Add skip error: %s - %s", filename, error
)


def extract_finding_info(
lines: list[str], lineno: int
) -> tuple[str | None, str | None, int | None]:
"""Extract finding info."""
check_line = lines[lineno - 1]
checkov_id = check_line.split(":")[1].strip()
file_line = lines[lineno + 1]
parts = file_line.split(":")
if len(parts) < 3:
# Suppress the warning message
# logging.warning("Unexpected format in file line: %s", file_line)
return None, None, None
file_path = parts[1].strip()
line_info = parts[2].strip()
start_line, _ = map(int, line_info.split("-"))
return checkov_id, file_path, start_line


def process_finding(filename: str, lines: list[str], lineno: int):
"""Process finding."""
lines: List[str], lineno: int
) -> Tuple[Optional[str], Optional[str], Optional[int]]:
"""Extract finding info from log."""
try:
checkov_id, file_path, start_line = extract_finding_info(
lines, lineno)
if not checkov_id or not file_path or not start_line:
return

# Include start_line in finding_id
finding_id = f"{file_path}:{checkov_id}:{start_line}"
_, checkov_id = lines[
lineno - 1
].split(":", 1)
file_path, line_range = lines[lineno + 1].split(
":"
)[1:3]
start_line = int(line_range.split("-")[0])
return checkov_id.strip(), file_path.strip(), start_line
except (IndexError, ValueError) as error:
log_line = lines[lineno] if 0 <= lineno < len(
lines
) else "Out of range"
logging.error(
"Extract info error: %s - %s - %s",
lineno,
log_line,
error
)
return None, None, None

if finding_id in PROCESSED_FINDINGS[filename]:
logging.debug("Skipping already processed finding: %s in %s",
finding_id, filename)
return

add_checkov_skip(file_path, checkov_id, start_line - 1)
PROCESSED_FINDINGS[filename].add(finding_id)
except OSError as e:
logging.exception("Error processing finding in %s: %s", filename, e)
def check_deprecated_version(line: str) -> Optional[str]:
"""Checks for deprecated versions."""
return next((v for v in DATES if v in line), None)


def process_file(filename: str):
"""Process file."""
def process_ckv_aws_363_finding(
filename: str,
file_path: str,
start_line: int
) -> None:
"""Process a single CKV_AWS_363 finding."""
try:
with open(file_path, "r", encoding="utf-8") as f:
file_lines = f.readlines()
except FileNotFoundError:
logging.error("File not found: %s", file_path)
return

line_to_check = file_lines[
start_line - 1
] if 0 < start_line <= len(
file_lines
) else ""
dep_ver = check_deprecated_version(line_to_check)
if dep_ver:
add_skip_comment(file_path, "CKV_AWS_363", start_line - 1)
logging.info(
"Skip added for deprecated "
"version: %s in %s:%s",
dep_ver,
filename,
start_line
)
else:
logging.info(
"Not a deprecated version check "
"in %s:%s",
filename,
start_line
)


def process_findings(filename: str, lines: List[str]) -> None:
"""Process CKV_AWS_363 findings."""
for lineno, line in enumerate(lines):
if "CKV_AWS_363" in line and FRS in line:
(
checkov_id,
file_path,
start_line,
) = extract_finding_info(lines, lineno)
if all([checkov_id, file_path, start_line]):
if checkov_id == "CKV_AWS_363":
process_ckv_aws_363_finding(
filename,
cast(str, file_path),
cast(int, start_line)
)


def process_file(filename: str) -> None:
"""Process a single file."""
logging.info("Processing: %s", filename)
try:
logging.info("Processing file: %s", filename)
with open(filename, "r", encoding="utf-8") as f:
lines = f.readlines()
if filename not in PROCESSED_FINDINGS:
PROCESSED_FINDINGS[filename] = set()
total_findings = lines.count(FAILED_RESOURCE_STR)
with tqdm(total=total_findings,
desc="Adding skips",
unit="finding") as pbar:
for lineno, line in enumerate(lines):
if FAILED_RESOURCE_STR in line:
process_finding(filename, lines, lineno)
pbar.update(1)
logging.info("Finished processing file: %s", filename)
except OSError as e:
logging.exception("Error processing %s: %s", filename, e)


def validate_findings(files):
"""Validate if all findings were processed."""
all_findings_processed = True
for filename in files:
with open(filename, "r", encoding="utf-8") as f:
lines = f.readlines()
for i, line in enumerate(lines):
if FAILED_RESOURCE_STR in line:
try:
file_line = lines[i + 1]
check_line = lines[i - 1]
file_path = file_line.split(":")[1].strip()
checkov_id = check_line.split(":")[1].strip()
finding_id = f"{file_path}:{checkov_id}"
if finding_id not in PROCESSED_FINDINGS.get(
filename, set()):
logging.error("Finding not processed: %s in %s",
finding_id, filename)
all_findings_processed = False
except IndexError:
# Suppress the error message
# logging.error("wrong fmt %d of %s", i + 1, filename)
all_findings_processed = False
return all_findings_processed


def process_files(pattern: str):
"""Process files."""
logging.info("Started processing files.")
files = glob.glob(pattern)
for filename in tqdm(files, desc="Processing files", unit="file"):
with open(filename, "r", encoding="utf-8") as file:
lines = file.readlines()
process_findings(filename, lines)
except FileNotFoundError:
logging.error("File not found: %s", filename)
logging.info("Finished: %s", filename)


def process_files(pattern: str) -> None:
"""Process files matching a pattern."""
logging.info("Started.")
for filename in tqdm(
glob.glob(pattern),
desc="Processing files",
unit="file",
colour="magenta",
):
process_file(filename)
reprocess_from_log(LOG_FILE)
logging.info("All processed.")

if validate_findings(files):
logging.info("All findings processed successfully.")
else:
logging.warning("Some findings were not processed.")

def reprocess_from_log(log_file: str) -> None:
"""Re-process findings from the log."""
try:
with open(log_file, "r", encoding="utf-8") as file:
lines = file.readlines()
except FileNotFoundError:
logging.error("Log file not found: %s", log_file)
return

for line in lines:
if "Finding not processed" in line:
try:
_, file_path, checkov_id, start_line_str = (
line.split(":")
)
start_line = int(start_line_str.split()[0])
add_skip_comment(
cast(str, file_path.strip()),
cast(str, checkov_id.strip()),
start_line - 1,
)
except (IndexError, ValueError) as error:
logging.error(
"Reprocess error: %s - %s", line, error
)


if __name__ == "__main__":
Expand Down

0 comments on commit 60e0e44

Please sign in to comment.