Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logger): Make logger configurable #21

Merged
merged 2 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ jobs:

- name: Tests
run: |
python -m pytest
python -m pytest -s
9 changes: 9 additions & 0 deletions examples/shell_scripts/send_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@
import argparse
import json
import sys
import logging
from cscapi.client import CAPIClient, CAPIClientConfig
from cscapi.sql_storage import SQLStorage
from cscapi.utils import create_signal
from cscapi.utils import generate_machine_id_from_key

logger = logging.getLogger("capi-py-sdk")
logger.setLevel(logging.DEBUG) # Change this to the level you want
console_handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


class CustomHelpFormatter(argparse.HelpFormatter):
def __init__(self, prog, indent_increment=2, max_help_position=48, width=None):
Expand Down Expand Up @@ -121,6 +129,7 @@ def __init__(self, prog, indent_increment=2, max_help_position=48, width=None):
scenarios=machine_scenarios,
prod=args.prod,
user_agent_prefix=args.user_agent_prefix,
logger=logger,
),
)

Expand Down
48 changes: 26 additions & 22 deletions src/cscapi/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from more_itertools import batched

__version__ = metadata.version("cscapi").split("+")[0]

logging.getLogger("capi-py-sdk").addHandler(logging.NullHandler())
null_logger = logging.getLogger("capi-py-sdk")
null_logger.addHandler(logging.NullHandler())

CAPI_BASE_URL = "https://api.crowdsec.net/v3"
CAPI_BASE_DEV_URL = "https://api.dev.crowdsec.net/v3"
Expand All @@ -26,18 +26,20 @@
CAPI_METRICS_ENDPOINT = "/metrics"


def has_valid_token(machine: MachineModel, latency_offset=10) -> bool:
logging.debug(f"checking if token is valid for machine {machine.machine_id}")
def has_valid_token(
machine: MachineModel, latency_offset=10, logger: logging.Logger = null_logger
) -> bool:
logger.debug(f"checking if token is valid for machine {machine.machine_id}")
try:
payload = jwt.decode(machine.token, options={"verify_signature": False})
except jwt.exceptions.DecodeError:
logging.debug(
logger.debug(
f"could not decode token {machine.token} for machine {machine.machine_id}"
)
return False
current_time = time.time()
has_enough_ttl = current_time - latency_offset < payload["exp"]
logging.debug(
logger.debug(
f"token for machine {machine.machine_id} has_enough_ttl = {has_enough_ttl}"
)
return has_enough_ttl
Expand All @@ -51,6 +53,7 @@ class CAPIClientConfig:
max_retries: int = 3
latency_offset: int = 10
retry_delay: int = 5
logger: logging.Logger = null_logger


def _group_signals_by_machine_id(
Expand All @@ -69,6 +72,7 @@ def __init__(self, storage: StorageInterface, config: CAPIClientConfig):
self.latency_offset = config.latency_offset
self.max_retries = config.max_retries
self.retry_delay = config.retry_delay
self.logger = config.logger or null_logger

self.url = CAPI_BASE_URL if config.prod else CAPI_BASE_DEV_URL

Expand Down Expand Up @@ -115,11 +119,11 @@ def _send_signals_by_machine_id(
attempt_count = 0

while machines_to_process_attempts:
logging.info(f"attempt {attempt_count} to send signals")
self.logger.info(f"attempt {attempt_count} to send signals")
retry_machines_to_process_attempts: List[MachineModel] = []
if attempt_count >= self.max_retries:
for machine_to_process in machines_to_process_attempts:
logging.error(
self.logger.error(
f"Machine {machine_to_process.machine_id} is marked as failing"
)
self.storage.update_or_create_machine(
Expand All @@ -130,12 +134,12 @@ def _send_signals_by_machine_id(
for machine_to_process in machines_to_process_attempts:
machine_to_process = self._prepare_machine(machine_to_process)
if machine_to_process.is_failing:
logging.error(
self.logger.error(
f"skipping sending signals for machine {machine_to_process.machine_id} as it's marked as failing"
)
continue

logging.info(
self.logger.info(
f"sending signals for machine {machine_to_process.machine_id}"
)
try:
Expand All @@ -144,7 +148,7 @@ def _send_signals_by_machine_id(
signals_by_machineid[machine_to_process.machine_id],
)
except httpx.HTTPStatusError as exc:
logging.error(
self.logger.error(
f"error while sending signals: {exc} for machine {machine_to_process.machine_id}"
)
if exc.response.status_code == 401:
Expand All @@ -157,19 +161,19 @@ def _send_signals_by_machine_id(
retry_machines_to_process_attempts.append(machine_to_process)
continue
if prune_after_send:
logging.info(
self.logger.info(
f"pruning sent signals for machine {machine_to_process.machine_id}"
)
self._prune_sent_signals()

logging.info(
self.logger.info(
f"sending metrics for machine {machine_to_process.machine_id}"
)

try:
self._send_metrics_for_machine(machine_to_process)
except httpx.HTTPStatusError as exc:
logging.error(
self.logger.error(
f"Error while sending metrics: {exc} for machine {machine_to_process.machine_id}"
)

Expand All @@ -178,7 +182,7 @@ def _send_signals_by_machine_id(
if (len(retry_machines_to_process_attempts) != 0) and (
attempt_count < self.max_retries
):
logging.info(
self.logger.info(
f"waiting {self.retry_delay} seconds before retrying sending signals"
)
time.sleep(self.retry_delay)
Expand Down Expand Up @@ -219,7 +223,7 @@ def _send_metrics_for_machine(self, machine: MachineModel):
resp.raise_for_status()
break
except httpx.HTTPStatusError as exc:
logging.error(
self.logger.error(
f"received error {exc} while sending metrics for machine {machine.machine_id}"
)

Expand Down Expand Up @@ -247,7 +251,7 @@ def _refresh_machine_token(self, machine: MachineModel) -> MachineModel:
try:
resp.raise_for_status()
except httpx.HTTPStatusError as exc:
logging.error(
self.logger.error(
"Error while refreshing token: machine_id might be already registered or password is wrong"
)
raise exc
Expand All @@ -259,7 +263,7 @@ def _refresh_machine_token(self, machine: MachineModel) -> MachineModel:
return new_machine

def _register_machine(self, machine: MachineModel) -> MachineModel:
logging.info(f"registering machine {machine.machine_id}")
self.logger.info(f"registering machine {machine.machine_id}")
machine.password = (
machine.password if machine.password else secrets.token_urlsafe(32)
)
Expand All @@ -276,7 +280,7 @@ def _register_machine(self, machine: MachineModel) -> MachineModel:
def _prepare_machine(self, machine: MachineModel):
machine = self._ensure_machine_capi_registered(machine)
if machine.is_failing:
logging.error(
self.logger.error(
f"skipping connection for machine {machine.machine_id} as it's marked as failing"
)
return machine
Expand All @@ -292,7 +296,7 @@ def _ensure_machine_capi_registered(self, machine: MachineModel) -> MachineModel

def _ensure_machine_capi_connected(self, machine: MachineModel) -> MachineModel:
if not has_valid_token(
machine, self.latency_offset
machine, self.latency_offset, self.logger
) or not self._has_valid_scenarios(machine):
return self._refresh_machine_token(machine)
return machine
Expand Down Expand Up @@ -328,7 +332,7 @@ def enroll_machines(
for machine_id in machine_ids:
machine = self._prepare_machine(MachineModel(machine_id=machine_id))
if machine.is_failing:
logging.error(
self.logger.error(
f"skipping enrollment for machine {machine.machine_id} as it's marked as failing"
)
continue
Expand All @@ -346,7 +350,7 @@ def enroll_machines(
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 401:
if attempt_count >= self.max_retries:
logging.error(
self.logger.error(
f"Error while enrolling machine {machine_id}: {exc}"
)
continue
Expand Down
9 changes: 9 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import httpx
import jwt
import pytest
import logging
from dacite import from_dict
from pytest_httpx import HTTPXMock

Expand All @@ -47,6 +48,13 @@
from cscapi.sql_storage import SQLStorage
from cscapi.storage import MachineModel, SignalModel

logger = logging.getLogger("capi-py-sdk")
logger.setLevel(logging.DEBUG) # Change this to the level you want
console_handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)


def mock_signals():
return [
Expand Down Expand Up @@ -115,6 +123,7 @@ def client(storage):
scenarios=["crowdsecurity/http-bf", "crowdsecurity/ssh-bf"],
max_retries=1,
retry_delay=0,
logger=logger,
),
)

Expand Down