Skip to content

Commit

Permalink
improve memory usage and responsiveness?
Browse files Browse the repository at this point in the history
  • Loading branch information
elesiuta committed Aug 31, 2021
1 parent 87dc036 commit 82c1e04
Showing 1 changed file with 40 additions and 34 deletions.
74 changes: 40 additions & 34 deletions picosnitch.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
except Exception:
system_notification = lambda title, message, app_name: print(message)

VERSION = "0.4.6"
VERSION = "0.4.7"


class Daemon:
Expand Down Expand Up @@ -289,13 +289,15 @@ def drop_root_privileges() -> None:
os.setuid(int(os.getenv("SUDO_UID")))


def terminate_snitch_updater(snitch: dict, q_error: multiprocessing.Queue):
def terminate_snitch_updater(snitch: dict, q_error: multiprocessing.Queue, q_monitor_term, snitch_pipe):
"""write snitch one last time"""
while not q_error.empty():
error = q_error.get()
snitch["Errors"].append(time.ctime() + " " + error)
toast(error, file=sys.stderr)
write(snitch)
q_monitor_term.put("TERMINATE")
snitch_pipe.close()
sys.exit(0)


Expand Down Expand Up @@ -376,9 +378,10 @@ def get_vt_results(snitch: dict, q_vt: multiprocessing.Queue, check_pending: boo

def safe_q_get(q: multiprocessing.Queue, q_term: multiprocessing.Queue):
"""prevent the updater subprocess from hanging on the next request/result check if p_sha or p_psutil die"""
parent_process = multiprocessing.parent_process()
while True:
try:
if not q_term.empty() or not multiprocessing.parent_process().is_alive():
if not q_term.empty() or not parent_process.is_alive():
os.kill(os.getpid(), signal.SIGTERM)
# raise Exception("Process terminated")
# sys.exit(1)
Expand Down Expand Up @@ -596,14 +599,15 @@ def update_snitch(snitch: dict, proc: dict, conn: dict, sha256: str, ctime: str,


def updater_subprocess(p_virustotal, init_scan, init_pickle,
q_snitch, q_error,
q_snitch, q_error, snitch_pipe, q_monitor_term,
q_sha_pending, q_sha_results,
q_psutil_pending, q_psutil_results,
q_vt_pending, q_vt_results,
q_updater_restart, q_updater_ready, q_updater_term
):
"""main subprocess where snitch.json is updated with new connections and the user is notified"""
# drop root privileges and init variables for loop
parent_process = multiprocessing.parent_process()
drop_root_privileges()
pickle_path = os.path.join(os.path.expanduser("~"), ".config", "picosnitch", "pickle.tmp")
if init_pickle is None:
Expand All @@ -614,8 +618,8 @@ def updater_subprocess(p_virustotal, init_scan, init_pickle,
sizeof_snitch = sys.getsizeof(pickle.dumps(snitch))
last_write = 0
# init signal handlers
signal.signal(signal.SIGTERM, lambda *args: terminate_snitch_updater(snitch, q_error))
signal.signal(signal.SIGINT, lambda *args: terminate_snitch_updater(snitch, q_error))
signal.signal(signal.SIGTERM, lambda *args: terminate_snitch_updater(snitch, q_error, q_monitor_term, snitch_pipe))
signal.signal(signal.SIGINT, lambda *args: terminate_snitch_updater(snitch, q_error, q_monitor_term, snitch_pipe))
# update snitch with initial running processes and connections
if init_scan:
get_vt_results(snitch, q_vt_pending, True)
Expand All @@ -630,14 +634,12 @@ def updater_subprocess(p_virustotal, init_scan, init_pickle,
snitch["Errors"].append(time.ctime() + " " + error)
toast(error, file=sys.stderr)
# check if terminating
try:
_ = q_updater_term.get(block=False)
terminate_snitch_updater(snitch, q_error)
except queue.Empty:
if not multiprocessing.parent_process().is_alive():
snitch["Errors"].append(time.ctime() + " picosnitch has stopped")
toast("picosnitch has stopped", file=sys.stderr)
terminate_snitch_updater(snitch, q_error)
if not q_updater_term.empty():
terminate_snitch_updater(snitch, q_error, q_monitor_term, snitch_pipe)
if not parent_process.is_alive():
snitch["Errors"].append(time.ctime() + " picosnitch has stopped")
toast("picosnitch has stopped", file=sys.stderr)
terminate_snitch_updater(snitch, q_error, q_monitor_term, snitch_pipe)
# check if updater needs to restart
# try:
# _ = q_updater_restart.get(block=False)
Expand All @@ -648,15 +650,14 @@ def updater_subprocess(p_virustotal, init_scan, init_pickle,
# except queue.Empty:
# pass
# get list of new processes and connections since last update
time.sleep(5)
new_processes = []
try:
while True:
new_processes.append(q_snitch.get(block=False))
except queue.Empty:
pass
# time.sleep(5)
new_processes_q = []
snitch_pipe.poll(timeout=5)
while snitch_pipe.poll():
new_processes_q.append(snitch_pipe.recv_bytes())
# process the list and update snitch
new_processes = [pickle.loads(proc) for proc in new_processes]
new_processes = [pickle.loads(proc) for proc in new_processes_q]
del new_processes_q
missed_conns, update_snitch_pending = update_snitch_processor(snitch, known_pids, missed_conns, new_processes, q_updater_term, q_psutil_pending, q_psutil_results)
update_snitch_wrapper(snitch, update_snitch_pending, q_updater_term, q_sha_pending, q_sha_results, q_vt_pending)
get_vt_results(snitch, q_vt_results, False)
Expand All @@ -676,9 +677,10 @@ def updater_subprocess(p_virustotal, init_scan, init_pickle,
write(snitch)


def linux_monitor_subprocess(q_snitch, q_error, q_monitor_term):
def linux_monitor_subprocess(snitch_pipe, q_snitch, q_error, q_monitor_term):
"""runs a bpf program to monitor the system for new processes and connections and puts them in the queue"""
from bcc import BPF
parent_process = multiprocessing.parent_process()
if os.getuid() == 0:
b = BPF(text=bpf_text)
execve_fnname = b.get_syscall_fnname("execve")
Expand All @@ -698,33 +700,34 @@ def queue_exec_event(cpu, data, size):
argv[event.pid].append(event.argv)
elif event.type == 1: # EVENT_RET
argv_text = b' '.join(argv[event.pid]).replace(b'\n', b'\\n')
q_snitch.put(pickle.dumps({"type": "exec", "pid": event.pid, "name": event.comm.decode(), "cmdline": argv_text.decode()}))
snitch_pipe.send_bytes(pickle.dumps({"type": "exec", "pid": event.pid, "name": event.comm.decode(), "cmdline": argv_text.decode()}))
try:
del argv[event.pid]
except Exception:
pass
def queue_ipv4_event(cpu, data, size):
event = b["ipv4_events"].event(data)
q_snitch.put(pickle.dumps({"type": "conn", "pid": event.pid, "ppid": event.ppid, "name": event.task.decode(), "port": event.dport, "ip": socket.inet_ntop(socket.AF_INET, struct.pack("I", event.daddr))}))
snitch_pipe.send_bytes(pickle.dumps({"type": "conn", "pid": event.pid, "ppid": event.ppid, "name": event.task.decode(), "port": event.dport, "ip": socket.inet_ntop(socket.AF_INET, struct.pack("I", event.daddr))}))
def queue_ipv6_event(cpu, data, size):
event = b["ipv6_events"].event(data)
q_snitch.put(pickle.dumps({"type": "conn", "pid": event.pid, "ppid": event.ppid, "name": event.task.decode(), "port": event.dport, "ip": socket.inet_ntop(socket.AF_INET6, event.daddr)}))
snitch_pipe.send_bytes(pickle.dumps({"type": "conn", "pid": event.pid, "ppid": event.ppid, "name": event.task.decode(), "port": event.dport, "ip": socket.inet_ntop(socket.AF_INET6, event.daddr)}))
def queue_other_event(cpu, data, size):
event = b["other_socket_events"].event(data)
q_snitch.put(pickle.dumps({"type": "conn", "pid": event.pid, "ppid": event.ppid, "name": event.task.decode(), "port": 0, "ip": ""}))
snitch_pipe.send_bytes(pickle.dumps({"type": "conn", "pid": event.pid, "ppid": event.ppid, "name": event.task.decode(), "port": 0, "ip": ""}))
def queue_dns_event(cpu, data, size):
event = b["dns_events"].event(data)
q_snitch.put(pickle.dumps({"type": "conn", "pid": event.pid, "ppid": event.ppid, "name": event.comm.decode(), "port": 0, "ip": "", "host": event.host.decode()}))
snitch_pipe.send_bytes(pickle.dumps({"type": "conn", "pid": event.pid, "ppid": event.ppid, "name": event.comm.decode(), "port": 0, "ip": "", "host": event.host.decode()}))
b["exec_events"].open_perf_buffer(queue_exec_event)
b["ipv4_events"].open_perf_buffer(queue_ipv4_event)
b["ipv6_events"].open_perf_buffer(queue_ipv6_event)
b["other_socket_events"].open_perf_buffer(queue_other_event)
b["dns_events"].open_perf_buffer(queue_dns_event)
while True:
if not q_monitor_term.empty() or not multiprocessing.parent_process().is_alive():
time.sleep(5)
if not q_monitor_term.empty() or not parent_process.is_alive():
return 0
try:
b.perf_buffer_poll(timeout=15)
b.perf_buffer_poll(timeout=0)
except Exception as e:
error = "BPF " + type(e).__name__ + str(e.args)
q_error.put(error)
Expand All @@ -735,10 +738,11 @@ def queue_dns_event(cpu, data, size):

def func_subprocess(func: typing.Callable, q_pending, q_results, q_term):
"""wrapper function for subprocess"""
parent_process = multiprocessing.parent_process()
last_error = 0
while True:
try:
if not q_term.empty() or not multiprocessing.parent_process().is_alive():
if not q_term.empty() or not parent_process.is_alive():
return 0
arg = pickle.loads(q_pending.get(block=True, timeout=15))
q_results.put(pickle.dumps(func(arg)))
Expand All @@ -754,6 +758,7 @@ def func_subprocess(func: typing.Callable, q_pending, q_results, q_term):

def virustotal_subprocess(config: dict, q_vt_pending, q_vt_results, q_vt_term):
"""get virustotal results of process executable"""
parent_process = multiprocessing.parent_process()
drop_root_privileges()
try:
import vt
Expand All @@ -763,7 +768,7 @@ def virustotal_subprocess(config: dict, q_vt_pending, q_vt_results, q_vt_term):
last_error = 0
while True:
try:
if not q_vt_term.empty() or not multiprocessing.parent_process().is_alive():
if not q_vt_term.empty() or not parent_process.is_alive():
return 0
time.sleep(config["VT limit request"])
proc, sha256 = pickle.loads(q_vt_pending.get(block=True, timeout=15))
Expand Down Expand Up @@ -812,14 +817,15 @@ def picosnitch_master_process(config, snitch_updater_pickle):
print("Did not detect a supported operating system", file=sys.stderr)
return 1
# start subprocesses
p_monitor = ProcessManager(name="snitchmonitor", target=monitor_subprocess)
snitch_updater_pipe, snitch_monitor_pipe = multiprocessing.Pipe(duplex=False)
p_monitor = ProcessManager(name="snitchmonitor", target=monitor_subprocess, init_args=(snitch_monitor_pipe,))
q_snitch, q_error = p_monitor.q_in, p_monitor.q_out
p_sha = ProcessManager(name="snitchsha", target=func_subprocess, init_args=(functools.lru_cache(get_sha256),))
p_psutil = ProcessManager(name="snitchpsutil", target=func_subprocess, init_args=(get_proc_info,))
p_virustotal = ProcessManager(name="snitchvirustotal", target=virustotal_subprocess, init_args=(config,))
p_updater = ProcessManager(name="snitchupdater", target=updater_subprocess,
extra_args=(p_virustotal.p, True, snitch_updater_pickle),
init_args=(q_snitch, q_error,
init_args=(q_snitch, q_error, snitch_updater_pipe, p_monitor.q_term,
p_sha.q_in, p_sha.q_out,
p_psutil.q_in, p_psutil.q_out,
p_virustotal.q_in, p_virustotal.q_out)
Expand Down

0 comments on commit 82c1e04

Please sign in to comment.