Skip to content

Commit

Permalink
More polish.
Browse files Browse the repository at this point in the history
  • Loading branch information
AdnaneKhan committed May 19, 2024
1 parent a3f8686 commit 8e8d16e
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 130 deletions.
15 changes: 15 additions & 0 deletions gato/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,21 @@ def yellow(cls, toformat: str):
return f'{Fore.YELLOW}{toformat}{Style.RESET_ALL}'
else:
return toformat

@classmethod
def blue(cls, toformat: str):
"""Makes the text blue and returns it.
Args:
toformat (str): Message to format.
Returns:
(str)): Formatted message.
"""
if cls not in cls._instances or Output().color:
return f'{Fore.CYAN}{toformat}{Style.RESET_ALL}'
else:
return toformat

@classmethod
def green(cls, toformat: str):
Expand Down
89 changes: 10 additions & 79 deletions gato/enumerate/recommender.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from gato.models.secret import Secret

from gato.enumerate.reports.actions import ActionsReport
from gato.enumerate.reports.runners import RunnersReport

class Recommender:

Expand All @@ -20,31 +21,9 @@ def print_repo_attack_recommendations(
scopes (list): List of scopes for user who ran Gato.
repository (Repository): Repository wrapper object.
"""
if repository.has_pwn_request():
report = ActionsReport()
report.report_pwn(repository)
ActionsReport.report_pwn(repository)
ActionsReport.report_injection(repository)

if repository.has_injection():
report = ActionsReport()
report.report_injection(repository)

# risks = repository.injection_risk


# for entry in risks:
# Output.result(
# f"The workflow {Output.bright(entry['workflow_name'])} runs on a risky trigger "
# f"and uses values by context within run/script steps!"
# )

# Output.tabbed(
# f"Examine the variables and gating: " + json.dumps(entry['details'], indent=4)
# )
# Output.info(f"Workflow URL: "
# f"{repository.repo_data['html_url']}/blob/"
# f"{repository.repo_data['default_branch']}/"
# f".github/workflows/{entry['workflow_name']}"
# )
if not repository.sh_runner_access:
if repository.is_admin():
Output.owned(
Expand Down Expand Up @@ -117,14 +96,6 @@ def print_repo_attack_recommendations(
"within the repository!"
)

elif repository.can_pull():
if repository.can_fork():
Output.inform(
"The user can only pull from the repository, but forking "
"is allowed! \n\tOnly a fork pull-request based attack would "
"be possible."
)

@staticmethod
def print_repo_secrets(scopes, secrets: list[Secret]):
"""Prints list of repository level secrets.
Expand Down Expand Up @@ -162,54 +133,14 @@ def print_repo_runner_info(repository: Repository):
repository (Repository): Repository wrapper object.
"""

if repository.sh_workflow_names:
Output.result(
f"The repository contains a workflow: "
f"{Output.bright(repository.sh_workflow_names[0])} that "
"might execute on self-hosted runners!"
)
# if repository.sh_workflow_names:
# Output.result(
# f"The repository contains a workflow: "
# f"{Output.bright(repository.sh_workflow_names[0])} that "
# "might execute on self-hosted runners!"
# )

if repository.accessible_runners:
non_ephemeral = False

for runner in repository.accessible_runners:
if runner.non_ephemeral:
Output.owned(
"The repository contains a non-ephemeral self-hosted runner!"
)
Output.tabbed(
"The runner name was: "
f"{Output.bright(runner.runner_name)}"
f" and the machine name was "
f"{Output.bright(runner.machine_name)}"
f" and the runner type was "
f"{Output.bright(runner.runner_type)}"
f" in the {Output.bright(runner.runner_group)} group"
f" with the following labels: "
f"{Output.bright(', '.join(runner.labels))}"
)
non_ephemeral = True
# Only print one non-ephemeral runner
break

# We don't need to print the ephemeral runners - this will
# still be captured in JSON output.
if not non_ephemeral:
Output.result(
f"The repository {Output.bright(repository.name)} contains a "
"previous workflow run that executed on a self-hosted runner!"
)
Output.tabbed(
"The runner name was: "
f"{Output.bright(repository.accessible_runners[0].runner_name)}"
f" and the machine name was "
f"{Output.bright(repository.accessible_runners[0].machine_name)}"
f" and the runner type was "
f"{Output.bright(repository.accessible_runners[0].runner_type)}"
f" in the {Output.bright(repository.accessible_runners[0].runner_group)} group"
f" with the following labels: "
f"{Output.bright(', '.join(repository.accessible_runners[0].labels))}"
)
RunnersReport.report_runners(repository)

if repository.runners:
Output.result(
Expand Down
113 changes: 71 additions & 42 deletions gato/enumerate/reports/actions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import textwrap

from gato.cli.output import Output
from gato.configuration.configuration_manager import ConfigurationManager

Expand Down Expand Up @@ -39,37 +37,35 @@ class ActionsReport(Report):
"user controlled, then they can be used to inject arbitrary code into the workflow."
)

def __init__(self):
"""
"""
super().__init__()

def report_pwn(self, repo: Repository):
@classmethod
def report_pwn(cls, repo: Repository):
"""Report Pwn Requests in the repository in a clean, human readable format.
"""
if repo.has_pwn_request():
self.print_divider()
self.print_header(repo, "Actions Pwn Requests")
cls.print_divider()
cls.print_header(repo, "Actions Pwn Requests")
risks = repo.pwn_req_risk
for entry in risks:
self.__print_pwn_details(repo, entry)
cls.__print_pwn_details(repo, entry)

self.print_divider()
cls.print_divider()

def report_injection(self, repo: Repository):
"""
@classmethod
def report_injection(cls, repo: Repository):
"""Report injection for the repository.
"""
if repo.has_injection():
self.print_divider()
self.print_header(repo, "Actions Script Injection")
cls.print_divider()
cls.print_header(repo, "Actions Script Injection")
risks = repo.injection_risk
for entry in risks:
self.__print_injection_details(repo, entry)
cls.__print_injection_details(repo, entry)

self.print_divider()
cls.print_divider()

@classmethod
def __reusable_details(
self,
cls,
workflow_name: str,
workflow_url: str,
issue_type: str,
Expand All @@ -84,19 +80,21 @@ def __reusable_details(
Output.generic(f" Issue Type: {issue_type}")
Output.generic(f' Trigger(s): {Output.yellow(",".join(triggers))}')
Output.generic(f" Details: {description}")
Output.generic(f" Workflow URL: {workflow_url}")
Output.generic(f" Workflow URL: {Output.blue(workflow_url)}")
Output.generic(f' Confidence: {Output.red(designation)}')


@classmethod
def __check_mutable(self, repository, entry, step):
# If we have a sha, it's immutable so skip.
if 'github.event.pull_request.head.sha' in step['ref'].lower():
repository.clear_pwn_request(entry['workflow_name'])
return False
return True

@classmethod
def __report_jobs(self, candidate, details):

"""
"""
lines = []

lines.append(f"{'~'*78}")
Expand All @@ -111,15 +109,16 @@ def __report_jobs(self, candidate, details):

return lines

@classmethod
def ___report_injection(self, candidate, details):
"""
"""Create lines for each part of the injection report.
"""
lines = []
confidence = 'UNKNOWN'

lines.append(f"{'~'*78}")
lines.append(f' Job: {Output.bright(candidate)}')

if details.get('if_check', ''):
if details["if_check"].startswith('RESTRICTED'):
confidence = "LOW"
Expand All @@ -131,16 +130,18 @@ def ___report_injection(self, candidate, details):

lines.append(f"Step: {step_name}")
for var in val["variables"]:
if not confidence and var in ConfigurationManager().WORKFLOW_PARSING['UNSAFE_CONTEXTS']:
if confidence == 'UNKNOWN' and \
var in ConfigurationManager().WORKFLOW_PARSING['UNSAFE_CONTEXTS']:
confidence = "HIGH"
lines.append(f'Variables: {", ".join(val["variables"])}')
if 'if_checks' in val and val['if_checks']:
lines.append(f' Step If-check: {val["if_checks"]}')

return lines, confidence

def __print_injection_details(self, repository: Repository, entry: dict):
"""
@classmethod
def __print_injection_details(cls, repository: Repository, entry: dict):
"""Print details about potential GitHub Actions script injections.
"""
designation = "UNKNOWN"
workflow_url = (
Expand All @@ -157,25 +158,25 @@ def __print_injection_details(self, repository: Repository, entry: dict):
if k == 'triggers':
continue
else:
entries, confidence = self.___report_injection(k, v)
entries, confidence = cls.___report_injection(k, v)
# Only goes up.
if designation == 'UNKNOWN':
designation = confidence
job_reports.extend(entries)

self.__reusable_details(
cls.__reusable_details(
entry['workflow_name'],
workflow_url,
"Actions Injection",
self.ACTIONS_INJECTION,
cls.ACTIONS_INJECTION,
injection["triggers"],
designation
)
for line in job_reports:
Output.generic(line)


def __print_pwn_details(self, repository: Repository, entry: dict):
@classmethod
def __print_pwn_details(cls, repository: Repository, entry: dict):
"""
"""
designation = "UNKNOWN"
Expand All @@ -190,29 +191,43 @@ def __print_pwn_details(self, repository: Repository, entry: dict):
job_reports = []
for candidate, details in pwn_req['candidates'].items():
for step in details['steps']:
if not self.__check_mutable(repository, entry, step):
if not cls.__check_mutable(repository, entry, step):
return False

if details['confidence'] and details['confidence'] in \
['MEDIUM','HIGH'] and designation in ['UNKNOWN','LOW']:
designation = details['confidence']
job_reports.extend(self.__report_jobs(candidate, details))
self.__reusable_details(entry['workflow_name'], workflow_url, "Pwn Request with Approval TOCTOU", self.ENVIRONMENT_TOCTOU, pwn_req["triggers"], designation)
job_reports.extend(cls.__report_jobs(candidate, details))
cls.__reusable_details(
entry['workflow_name'],
workflow_url,
"Pwn Request with Approval TOCTOU",
cls.ENVIRONMENT_TOCTOU,
pwn_req["triggers"],
designation
)
for line in job_reports:
Output.generic(line)
elif len(pwn_req['triggers']) == 1 and pwn_req['triggers'][0] == 'pull_request_target:labeled':
job_reports = []
for candidate, details in pwn_req['candidates'].items():
for step in details['steps']:
if not self.__check_mutable(repository, entry, step):
if not cls.__check_mutable(repository, entry, step):
return False

if details['confidence'] and details['confidence'] in \
['MEDIUM','HIGH'] and designation in ['UNKNOWN','LOW']:
designation = details['confidence']

job_reports.extend(self.__report_jobs(candidate, details))
self.__reusable_details(entry['workflow_name'], workflow_url,"Pwn Request with Label TOCTOU", self.LABEL_TOCTOU, pwn_req["triggers"], designation)
job_reports.extend(cls.__report_jobs(candidate, details))
cls.__reusable_details(
entry['workflow_name'],
workflow_url,
"Pwn Request with Label TOCTOU",
cls.LABEL_TOCTOU,
pwn_req["triggers"],
designation
)
for line in job_reports:
Output.generic(line)
else:
Expand All @@ -221,7 +236,7 @@ def __print_pwn_details(self, repository: Repository, entry: dict):
for candidate, details in pwn_req['candidates'].items():
if details['gated']:
for step in details['steps']:
if not self.__check_mutable(repository, entry, step) and \
if not cls.__check_mutable(repository, entry, step) and \
(not ("issue_comment" in pwn_req["triggers"] and len(pwn_req["triggers"]) == 1)):
return False
else:
Expand All @@ -230,15 +245,29 @@ def __print_pwn_details(self, repository: Repository, entry: dict):
if details['confidence'] and details['confidence'] in \
['MEDIUM','HIGH'] and designation in ['UNKNOWN','LOW']:
designation = details['confidence']
job_reports.extend(self.__report_jobs(candidate, details))
job_reports.extend(cls.__report_jobs(candidate, details))

if toctou:
# If we got here then it is at least medium.
if designation == 'UNKNOWN':
designation = 'MEDIUM'
self.__reusable_details(entry['workflow_name'], workflow_url, "Pwn Request With Permission TOCTOU", self.PERMISSION_TOCTOU, pwn_req["triggers"], designation)
cls.__reusable_details(
entry['workflow_name'],
workflow_url,
"Pwn Request With Permission TOCTOU",
cls.PERMISSION_TOCTOU,
pwn_req["triggers"],
designation
)
else:
self.__reusable_details(entry['workflow_name'], workflow_url, "Pwn Request", self.PWN_REQUEST, pwn_req["triggers"], designation)
cls.__reusable_details(
entry['workflow_name'],
workflow_url,
"Pwn Request",
cls.PWN_REQUEST,
pwn_req["triggers"],
designation
)

for line in job_reports:
Output.generic(line)
Loading

0 comments on commit 8e8d16e

Please sign in to comment.