Skip to content
This repository has been archived by the owner on Apr 26, 2021. It is now read-only.

Azure Machinery for Cuckoo #3120

Open
wants to merge 64 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
a7c288f
Initial commit for Azure machinery
cccs-kevin Feb 4, 2020
e3cd8ff
NICs weren't being deleted when instance creation failed
cccs-kevin May 29, 2020
5e69fa6
Moved API calls to the beginning, since they were being made more oft…
cccs-kevin Jul 3, 2020
534fa13
Adding ability to specify devops environment, such that multiple Cuck…
cccs-kevin Jul 22, 2020
d01b16e
Added ability to use multiple guest os versions
cccs-kevin Jul 24, 2020
b865fff
Removing files deleted by AV
cccs-kevin Oct 8, 2020
00d7e9b
Refactoring az.py so that scaling is in tune with DB, also edited som…
cccs-kevin Oct 8, 2020
f7376d7
Added more comments, updated test conf files
cccs-kevin Oct 15, 2020
07ded61
Some renaming of methods, added handling of snapshot id for non-reque…
cccs-kevin Oct 16, 2020
4ee39d0
Correcting method name
cccs-kevin Oct 16, 2020
1874a29
Mistake on naming methods addressed
cccs-kevin Oct 16, 2020
783f41d
Added collision avoidance on instances
cccs-kevin Oct 16, 2020
2f63291
Adding platform for Ubuntu
cccs-kevin Oct 19, 2020
d830b73
Adding better logging regarding acquiring machines
cccs-kevin Oct 28, 2020
889c031
Moving acquire logging to machinery
cccs-kevin Oct 28, 2020
8393407
Tweaking what tags contains when being sent to acquire in abstracts
cccs-kevin Oct 28, 2020
2147975
Adding case if no machine exists in DB
cccs-kevin Oct 29, 2020
c5dc312
Fine-tuning the acquire method
cccs-kevin Oct 29, 2020
84b4698
Adding method to tag disk, adding class constants
cccs-kevin Oct 30, 2020
52408ed
Small logic tweak
cccs-kevin Oct 30, 2020
f0b565b
Correcting platform value for Ubuntu
cccs-kevin Oct 30, 2020
7e2966d
Revised marking disks for deletion
cccs-kevin Nov 5, 2020
d8631d7
Adding logging for api limits
cccs-kevin Dec 2, 2020
0b2d21d
Reworking Azure API errors, resizing machines being created when vm f…
cccs-kevin Dec 2, 2020
0779544
Fixing logic bug in peak throuput limiting
cccs-kevin Dec 3, 2020
abc39a8
Replaced API calls that would mark nic/disk for deletion with set
cccs-kevin Dec 4, 2020
3af045e
Adding polling thread that deletes leftover resources
cccs-kevin Dec 4, 2020
24475f3
Giving the people what they want!
cccs-kevin Dec 4, 2020
bb79317
Have patience!
cccs-kevin Dec 4, 2020
e2afa6e
Added clause that wil delete hanging VMs that were created by Cuckoo
cccs-kevin Dec 7, 2020
f22ffd5
Fixed logical bug where in some cases the availables weren't relevant…
cccs-kevin Dec 7, 2020
4e770c1
Adding ability to use Azure Spot Instances by checking for VM status …
cccs-kevin Dec 9, 2020
3893201
Tweaked logic in scheduler such that a pending task will only be fetc…
cccs-kevin Dec 10, 2020
c610585
Additional machinery for Azure using virtual machine scale sets
cccs-kevin Jan 4, 2021
bb43937
Updating VMSS machinery
cccs-kevin Jan 4, 2021
6e45832
Small tweaks
cccs-kevin Jan 11, 2021
ba84673
Adding additional checks to ensure up-time
cccs-kevin Jan 15, 2021
f3a1418
Improved error-handling and logging
cccs-kevin Jan 19, 2021
960f315
More thorough error-handling
cccs-kevin Jan 25, 2021
a7c28ce
Adding logic to handle Azure error states, improved error-handling an…
cccs-kevin Jan 27, 2021
2a4c142
Wrapping _thr_scale_machine_pool in try-catch to find errors, improvi…
cccs-kevin Jan 28, 2021
0d5d672
Adding more logging, additional logic in scaling machine pools, and f…
cccs-kevin Feb 2, 2021
462b726
Removed bottleneck when dealing with multiple tasks directed for diff…
cccs-kevin Feb 3, 2021
85c7b68
Improving exception handling, adding method to recreate failed VMSS
cccs-kevin Mar 15, 2021
6e3df73
Adding default tag value
cccs-kevin Mar 16, 2021
ec0dd19
Optimizing speed at which a task comes off the queue and is assigned …
cccs-kevin May 5, 2021
13af923
Ordering tasks by FIFO
cccs-kevin May 13, 2021
3e60bc0
Adding catch for psycopg UniqueViolation when adding sample
cccs-kevin May 13, 2021
25c7ecc
Adding try catch for better logging
cccs-kevin May 14, 2021
0923f0c
Adding platform support
cccs-kevin May 17, 2021
564a166
Scaling based on platform
cccs-kevin May 25, 2021
ff31944
Preventing IP collision
cccs-kevin May 28, 2021
a5cb11b
Adding logging if machine doesn't exist upon delete
cccs-kevin May 28, 2021
59edd91
Cannot use Manual
cccs-kevin May 31, 2021
a3d7afc
Revamp stop() and delete_vm_from_vmss() to avoid errors in Azure
cccs-kevin Jun 4, 2021
b3f174a
This polling is required if the Azure VM agent is not installed in VMs
cccs-kevin Jun 10, 2021
0165f37
New information about how Azure VMSSs work caused more reliance on ba…
cccs-kevin Jun 18, 2021
060e850
Adding missing global variable
cccs-kevin Jun 29, 2021
03f50f2
Removing VMs in delete phase if error occurs
cccs-kevin Jul 6, 2021
b4a41f1
Making program threadsafe, adding handling to spot instance deletion …
cccs-kevin Jul 14, 2021
95eacd2
Adding handling for 'badrequest' Azure error
cccs-kevin Jul 16, 2021
43455aa
Bug fix if tag doesn't exist
cccs-kevin Aug 3, 2021
00f318e
Adding a scaling ceiling based on quota availability
cccs-kevin Apr 25, 2022
74ad92c
Fixing bug in usage quota logic
cccs-kevin May 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions cuckoo/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,63 @@ class Config(object):
"guacd_port": Int(4822),
},
},
"az": {
"az": {
"region_name": String("earth"),
"group": String("malware_fighters"),
"subscription_id": String(
"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"),
"client_id": String("xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"),
"secret": String("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"),
"tenant": String("xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"),
"machines": List(String, "cuckoo1"),
"interface": String("eth1"),
"running_machines_gap": Int(110),
"vnet": String("cuckoo-vnet"),
"cuckoo_subnet": String("cuckoo-subnet"),
"environment": String("staging"),
"dynamic_machines_limit": Int(10),
"instance_type": String("average"),
"options": List(String, None, ",\\s"),
"tags": String(),
"resultserver_ip": String("192.168.54.111"),
"resultserver_port": Int(2042),
"guest_snapshot": List(String, None, ",\\s"),
"storage_account_type": String("sample-type"),
"initial_pool_size": Int(1),
},
},
"az_with_vmss": {
"az_with_vmss": {
"region_name": String("earth"),
"group": String("malware_fighters"),
"subscription_id": String(
"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"),
"client_id": String("xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"),
"secret": String("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"),
"tenant": String("xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"),
"machines": List(String, "cuckoo1"),
"interface": String("eth1"),
"vnet": String("cuckoo-vnet"),
"subnet": String("cuckoo-subnet"),
"environment": String("staging"),
"machine_pool_limit": Int(10),
"total_machines_limit": Int(50),
"instance_type": String("average"),
"options": List(String, None, ",\\s"),
"tags": String(),
"resultserver_ip": String("192.168.54.111"),
"resultserver_port": Int(2042),
"gallery_name": String("Louvre"),
"gallery_image_names": List(String, None, ",\\s"),
"storage_account_type": String("sample-type"),
"initial_pool_size": Int(1),
"supported_os_tags": List(String, None, ",\\s"),
"overprovision": Int(0),
"wait_time_to_reimage": Int(30),
"spot_instances": Boolean(False),
},
},
"virtualbox": {
"virtualbox": {
"mode": String("headless"),
Expand Down
3 changes: 2 additions & 1 deletion cuckoo/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from cuckoo.common.utils import Singleton, classlock, json_encode, parse_bool
from cuckoo.misc import cwd, format_command

from psycopg2.errors import UniqueViolation
from sqlalchemy import create_engine, Column, not_, func
from sqlalchemy import Integer, String, Boolean, DateTime, Enum
from sqlalchemy import ForeignKey, Text, Index, Table, TypeDecorator
Expand Down Expand Up @@ -1063,7 +1064,7 @@ def add(self, obj, timeout=0, package="", options="", priority=1,

try:
session.commit()
except IntegrityError:
except (IntegrityError, UniqueViolation):
session.rollback()
try:
sample = session.query(Sample).filter_by(md5=obj.get_md5()).first()
Expand Down
17 changes: 13 additions & 4 deletions cuckoo/core/guest.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def post(self, method, *args, **kwargs):

def wait_available(self):
"""Wait until the Virtual Machine is available for usage."""
end = time.time() + self.timeout
end = time.time() + config("cuckoo:timeouts:vm_state")

while db.guest_get_status(self.task_id) == "starting" and self.do_run:
try:
Expand All @@ -351,7 +351,7 @@ def wait_available(self):
log.debug("%s: not ready yet", self.vmid)
except socket.error:
log.debug("%s: not ready yet", self.vmid)
time.sleep(1)
time.sleep(10)

if time.time() > end:
raise CuckooGuestCriticalTimeout(
Expand Down Expand Up @@ -485,7 +485,16 @@ def start_analysis(self, options, monitor):
# Pin the Agent to our IP address so that it is not accessible by
# other Virtual Machines etc.
if "pinning" in features:
self.get("/pinning")
strikes = 5
for strike in range(strikes):
try:
self.get("/pinning")
break
except Exception:
if strike == strikes-1:
raise
log.warning("Attempt #%s to pin machine %s %s" % (strike+1, self.vmid, self.ipaddr))
time.sleep(30)

# Obtain the environment variables.
self.query_environ()
Expand Down Expand Up @@ -560,7 +569,7 @@ def wait_for_completion(self):
# wait for things to recover
log.warning(
"Virtual Machine /status failed. This can indicate the "
"guest losing network connectivity"
"guest losing network connectivity with %s" % self.vmid
)
continue
except Exception as e:
Expand Down
103 changes: 55 additions & 48 deletions cuckoo/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
from cuckoo.common.objects import File
from cuckoo.common.files import Folders
from cuckoo.core.database import Database, TASK_COMPLETED, TASK_REPORTED
from cuckoo.core.database import Database, TASK_COMPLETED, TASK_REPORTED, TASK_RUNNING, TASK_PENDING
from cuckoo.core.guest import GuestManager
from cuckoo.core.plugins import RunAuxiliary, RunProcessing
from cuckoo.core.plugins import RunSignatures, RunReporting
Expand Down Expand Up @@ -162,7 +162,7 @@ def acquire_machine(self):
# In some cases it's possible that we enter this loop without
# having any available machines. We should make sure this is not
# such case, or the analysis task will fail completely.
if not machinery.availables():
if not machinery.availables(label=self.task.machine, platform=self.task.platform, tags=self.task.tags):
machine_lock.release()
time.sleep(1)
continue
Expand Down Expand Up @@ -224,13 +224,6 @@ def build_options(self):
options["timeout"] = self.cfg.timeouts.default
else:
options["timeout"] = self.task.timeout

# copy in other analyzer specific options, TEMPORARY (most likely)
vm_options = getattr(machinery.options, self.machine.name)
for k in vm_options:
if k.startswith("analyzer_"):
options[k] = vm_options[k]

return options

def route_network(self):
Expand Down Expand Up @@ -965,11 +958,18 @@ def _cleanup_managers(self):
cleaned.add(am)
return cleaned

def _thr_periodic_log(self):
log.debug("# Tasks: %d; # Available Machines: %d; # Locked Machines: %d; # Total Machines: %d;",
self.db.count_tasks(status=TASK_PENDING), self.db.count_machines_available(),
len(self.db.list_machines(locked=True)), len(self.db.list_machines()))
threading.Timer(10, self._thr_periodic_log).start()

def start(self):
"""Start scheduler."""
self.initialize()

log.info("Waiting for analysis tasks.")
self._thr_periodic_log()

# Message queue with threads to transmit exceptions (used as IPC).
errors = Queue.Queue()
Expand All @@ -978,27 +978,12 @@ def start(self):
if self.maxcount is None:
self.maxcount = self.cfg.cuckoo.max_analysis_count

launched_analysis = True
# This loop runs forever.
while self.running:
time.sleep(1)

# Run cleanup on finished analysis managers and untrack them
for am in self._cleanup_managers():
self.analysis_managers.discard(am)

# Wait until the machine lock is not locked. This is only the case
# when all machines are fully running, rather that about to start
# or still busy starting. This way we won't have race conditions
# with finding out there are no available machines in the analysis
# manager or having two analyses pick the same machine.
if not machine_lock.acquire(False):
logger(
"Could not acquire machine lock",
action="scheduler.machine_lock", status="busy"
)
continue

machine_lock.release()
if not launched_analysis:
time.sleep(1)
launched_analysis = False

# If not enough free disk space is available, then we print an
# error message and wait another round (this check is ignored
Expand Down Expand Up @@ -1064,28 +1049,50 @@ def start(self):
)
continue

# Fetch a pending analysis task.
# TODO This fixes only submissions by --machine, need to add
# other attributes (tags etc).
# TODO We should probably move the entire "acquire machine" logic
# from the Analysis Manager to the Scheduler and then pass the
# selected machine onto the Analysis Manager instance.
task, available = None, False
for machine in self.db.get_available_machines():
task = self.db.fetch(machine=machine.name)
if task:
break

if machine.is_analysis():
# Get all tasks in the queue
tasks = self.db.list_tasks(status=TASK_PENDING, details=True, order_by="added_on")
if not tasks:
continue

for task in tasks:
# Run cleanup on finished analysis managers and untrack them
for am in self._cleanup_managers():
self.analysis_managers.discard(am)

# Wait until the machine lock is not locked. This is only the case
# when all machines are fully running, rather that about to start
# or still busy starting. This way we won't have race conditions
# with finding out there are no available machines in the analysis
# manager or having two analyses pick the same machine.
if not machine_lock.acquire(False):
logger(
"Could not acquire machine lock",
action="scheduler.machine_lock", status="busy"
)
continue

machine_lock.release()

available = False
# Note that label > platform > tags
if task.machine:
if machinery.availables(label=task.machine):
available = True
elif task.platform:
if machinery.availables(platform=task.platform):
available = True
elif task.tags:
tag_names = [tag.name for tag in task.tags]
if machinery.availables(tags=tag_names):
available = True
else:
available = True

# We only fetch a new task if at least one of the available
# machines is not a "service" machine (again, please refer to the
# services auxiliary module for more information on service VMs).
if not task and available:
task = self.db.fetch(service=False)
if not available:
continue

self.db.set_status(task.id, TASK_RUNNING)

if task:
log.debug("Processing task #%s", task.id)
self.total_analysis_count += 1

Expand All @@ -1094,7 +1101,7 @@ def start(self):
analysis.daemon = True
analysis.start()
self.analysis_managers.add(analysis)

launched_analysis = True
# Deal with errors.
try:
raise errors.get(block=False)
Expand Down
Loading