Skip to content

Commit

Permalink
bug_trail
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewdeanmartin committed Jan 2, 2024
1 parent d52ca43 commit 89afee0
Show file tree
Hide file tree
Showing 13 changed files with 653 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,5 @@ ai_shell/dialog_log/
/api_logs/

ai_shell.toml
/bug_trail/logs/
/bug_trail/error_log.db
35 changes: 35 additions & 0 deletions bug_trail/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Bug Trail

This is a workstation logger to capture bugs encountered while you are writing code.

## Installation

```bash
pip install bug-trail
```

## Usage

```python
import bug_trail
import logging

db_path = "error_log.db"
handler = bug_trail.ErrorLogSQLiteHandler(db_path)
logging.basicConfig(handlers=[handler], level=logging.ERROR)

logger = logging.getLogger(__name__)
logger.error("This is an error message")
```

To generate to the log folder relative to the current working directory:

```bash
bug_trail --output logs --db error_log.db
```

## Security
None. Do not publish your error log to the internet. Add the log folder to your .gitignore file.

## Prior Art
Inspired by elmah. Much less ambitious, as this is just a browsable, static HTML report.
6 changes: 6 additions & 0 deletions bug_trail/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""
Captures error logs to sqlite. Use CLI or
"""
__all__ = ["ErrorLogSQLiteHandler"]

from bug_trail.handlers import ErrorLogSQLiteHandler
37 changes: 37 additions & 0 deletions bug_trail/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import argparse
import sys

from bug_trail.fs_utils import clear_data, prompt_and_update_gitignore
from bug_trail.views import render_all


def main() -> int:
"""
Main entry point for the CLI.
Returns:
int: 0 if successful, 1 if not
"""
parser = argparse.ArgumentParser(description="Tool for local logging and error reporting.")
parser.add_argument("--clear", action="store_true", help="Clear the database and log files")

parser.add_argument("--output", action="store_true", help="Where to output the logs", default="logs")
parser.add_argument("--db", action="store_true", help="Where to store the database", default="error_log.db")

parser.add_argument("--version", action="version", version="%(prog)s 1.0")

args = parser.parse_args()
db_path = args.db
log_folder = args.output
if args.clear:
clear_data(log_folder, db_path)
return 0

prompt_and_update_gitignore(".")
# Default actions
render_all(db_path, log_folder)
return 0


if __name__ == "__main__":
sys.exit(main())
123 changes: 123 additions & 0 deletions bug_trail/data_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import datetime
import sqlite3
from typing import Any


def serialize_to_sqlite_supported(value: str) -> Any:
"""
sqlite supports None, int, float, str, bytes by default, and also knows how to adapt datetime.date and datetime.datetime
everything else is str(value)
"""
if value is None:
return value
if isinstance(value, (int, float, str, bytes)):
return value
if isinstance(value, (datetime.date, datetime.datetime)):
return value
return str(value)


def fetch_log_data(db_path: str) -> list[dict[str, Any]]:
"""
Fetch all log records from the database.
Args:
db_path (str): Path to the SQLite database
Returns:
list[dict[str, Any]]: A list of dictionaries containing all log records
"""
# Connect to the SQLite database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# Query to fetch all rows from the logs table
query = "SELECT * FROM logs"
cursor.execute(query)

# Fetching column names from the cursor
columns = [description[0] for description in cursor.description]

# Fetch all rows, and convert each row to a dictionary
rows = cursor.fetchall()
log_data = []
for row in rows:
log_record = dict(zip(columns, row, strict=True))
log_data.append(log_record)

# Close the connection
conn.close()
return log_data


def fetch_log_data_grouped(db_path: str) -> Any:
"""
Fetch all log records from the database, and group them into a nested dictionary.
Args:
db_path (str): Path to the SQLite database
Returns:
Any: A nested dictionary containing all log records
"""
# Connect to the SQLite database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# Query to fetch all rows from the logs table
query = "SELECT * FROM logs"
cursor.execute(query)

# Fetching column names from the cursor
columns = [description[0] for description in cursor.description]

# Fetch all rows, and convert each row to a grouped dictionary
rows = cursor.fetchall()
log_data = []
for row in rows:
log_record = dict(zip(columns, row, strict=True))

# Grouping the log record
grouped_record = {
"MessageDetails": {key: log_record[key] for key in ["msg", "args", "levelname", "levelno"]},
"SourceContext": {
key: log_record[key] for key in ["name", "pathname", "filename", "module", "funcName", "lineno"]
},
"TemporalDetails": {key: log_record[key] for key in ["created", "msecs", "relativeCreated"]},
"ProcessThreadContext": {
key: log_record[key] for key in ["process", "processName", "thread", "threadName"]
},
"ExceptionDetails": {key: log_record[key] for key in ["exc_info", "exc_text"]},
"StackDetails": {key: log_record[key] for key in ["stack_info"]},
"UserData": {
key: log_record[key]
for key in log_record.keys()
- {
"msg",
"args",
"levelname",
"levelno",
"name",
"pathname",
"filename",
"module",
"funcName",
"lineno",
"created",
"msecs",
"relativeCreated",
"process",
"processName",
"thread",
"threadName",
"exc_info",
"exc_text",
"stack_info",
}
},
}
log_data.append(grouped_record)

# Close the connection
conn.close()
return log_data
29 changes: 29 additions & 0 deletions bug_trail/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Example usage"""

# Set up logging
import logging

from bug_trail.handlers import ErrorLogSQLiteHandler

db_path = "error_log.db"
handler = ErrorLogSQLiteHandler(db_path)
logging.basicConfig(handlers=[handler], level=logging.ERROR)

# Example usage
logger = logging.getLogger(__name__)
logger.error("This is an error message")


def run():
# Example usage
logger2 = logging.getLogger("adhoc")
logger2.error("This is an ad hoc error message")

logger.error("This is an error message")
try:
_ = 1 / 0
except ZeroDivisionError as e:
logger.exception(e)


run()
83 changes: 83 additions & 0 deletions bug_trail/fs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
This module contains functions related to file system operations.
"""
import os
import shutil


def empty_folder(folder_path: str) -> None:
"""
Empty the folder at the given path
Args:
folder_path (str): Path to the folder to be emptied
"""
if os.path.exists(folder_path) and os.path.isdir(folder_path):
shutil.rmtree(folder_path)
os.makedirs(folder_path, exist_ok=True)


def clear_data(log_folder: str, db_path: str) -> None:
"""
Clear the database and log files
"""
# Code to clear the database and log files
empty_folder(log_folder)
os.remove(db_path)


def get_containing_folder_path(file_path: str) -> str:
"""
Get the absolute path of the folder containing the given file.
Args:
file_path (str): Path to the file (__file__)
Returns:
str: Absolute path of the containing folder
"""
return os.path.abspath(os.path.dirname(file_path))


def is_git_repo(path: str) -> bool:
"""
Check if the path is inside a git repository by looking for a .git directory.
Args:
path (str): The directory path to check.
Returns:
bool: True if inside a git repo, False otherwise.
"""
current_path = path
while current_path != os.path.dirname(current_path):
if os.path.isdir(os.path.join(current_path, ".git")):
return True
current_path = os.path.dirname(current_path)
return False


def prompt_and_update_gitignore(repo_path: str) -> None:
"""Prompt the user to ignore logs and update .gitignore accordingly."""
if not is_git_repo(repo_path):
return

gitignore_path = os.path.join(repo_path, ".gitignore")

# Check if .gitignore exists and 'logs' is already listed
if os.path.exists(gitignore_path):
with open(gitignore_path, encoding="utf-8") as file:
if "logs" in file.read():
print("'logs' directory is already ignored in .gitignore.")
return

# Prompt user for action
response = (
input("This directory is a Git repository. Do you want to ignore 'logs' directory? (y/n): ").strip().lower()
)
if (response.lower() + "xxx")[0] == "y":
with open(gitignore_path, "a", encoding="utf-8") as file:
file.write("\nlogs/")
print("'logs' directory is now ignored in .gitignore.")
else:
print("No changes made to .gitignore.")
Loading

0 comments on commit 89afee0

Please sign in to comment.