-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
initial commit for audit logs #160
Changes from 1 commit
1196928
de5a5f3
7757c01
96173db
e62027b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,9 @@ | ||
import typing as t | ||
import uuid | ||
from datetime import datetime | ||
from enum import Enum | ||
|
||
from piccolo.apps.user.tables import BaseUser | ||
from piccolo.columns import Text, Timestamp, Varchar | ||
from piccolo.columns import JSON, Text, Timestamp, Varchar | ||
from piccolo.table import Table | ||
|
||
|
||
|
@@ -20,9 +19,15 @@ class ActionType(str, Enum): | |
action_type = Varchar(choices=ActionType) | ||
action_user = Varchar() | ||
change_message = Text() | ||
changes_in_row = JSON() | ||
|
||
@classmethod | ||
async def post_save_action(cls, table: t.Type[Table], user_id: int): | ||
async def record_save_action( | ||
cls, | ||
table: t.Type[Table], | ||
user_id: int, | ||
new_row_id=t.Union[str, uuid.UUID, int], | ||
): | ||
""" | ||
A method for tracking creating record actions. | ||
|
||
|
@@ -32,20 +37,21 @@ async def post_save_action(cls, table: t.Type[Table], user_id: int): | |
The ``primary key`` of authenticated user. | ||
""" | ||
result = cls( | ||
action_time=datetime.now(), | ||
action_type=cls.ActionType.CREATING, | ||
action_user=cls.get_user_username(user_id), | ||
change_message=f"User {cls.get_user_username(user_id)} " | ||
f"create new row in {table._meta.tablename.title()} table", | ||
f"create row {new_row_id} in {table._meta.tablename.title()} " | ||
f"table", | ||
) | ||
await result.save().run() | ||
|
||
@classmethod | ||
async def post_patch_action( | ||
async def record_patch_action( | ||
cls, | ||
table: t.Type[Table], | ||
row_id: t.Union[str, uuid.UUID, int], | ||
user_id: int, | ||
changes_in_row: t.Dict[str, t.Any], | ||
): | ||
""" | ||
A method for tracking updating record actions. | ||
|
@@ -57,18 +63,20 @@ async def post_patch_action( | |
monitor activities. | ||
:param user_id: | ||
The ``primary key`` of authenticated user. | ||
:param changes_in_row: | ||
JSON with all changed columns in the existing row. | ||
""" | ||
result = cls( | ||
action_time=datetime.now(), | ||
action_type=cls.ActionType.UPDATING, | ||
action_user=cls.get_user_username(user_id), | ||
change_message=f"User {cls.get_user_username(user_id)} update row " | ||
f"{row_id} in {table._meta.tablename.title()} table", | ||
changes_in_row=changes_in_row, | ||
) | ||
await result.save().run() | ||
|
||
@classmethod | ||
async def post_delete_action( | ||
async def record_delete_action( | ||
cls, | ||
table: t.Type[Table], | ||
row_id: t.Union[str, uuid.UUID, int], | ||
|
@@ -86,7 +94,6 @@ async def post_delete_action( | |
The ``primary key`` of authenticated user. | ||
""" | ||
result = cls( | ||
action_time=datetime.now(), | ||
action_type=cls.ActionType.DELETING, | ||
action_user=cls.get_user_username(user_id), | ||
change_message=f"User {cls.get_user_username(user_id)} delete row " | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd be tempted to get rid of the change message all together. One option is just storing everything in JSON. For delete, the json can be:
Likewise, for the save, it could be:
Or we have a field called It's just about making it easy to query - imagine you wanted to know who deleted row 42. You would have to something like: await AuditLog.select().where(
AuditLog.action_type == AuditLog.ActionType.DELETING,
AuditLog.change_message.like('%42%')
) It's quite imprecise, ad would be better to just do: await AuditLog.select(AuditLog.action_user).where(
AuditLog.action_type == AuditLog.ActionType.DELETING,
AuditLog.effected_rows.any(42)
) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly, table name should probably be a column too, to make it easier to filter out the changes on a certain table. I'd be tempted to add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
But all actions are for endpoints that work with a single record. We don't have the
That would be possible if we have only
Here is screenshot with new columns. I don't think we should remove the
I added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good points.
You're right, there's no bulk action endpoints at the moment. But I was just thinking about the future. Imagine someone deletes 1000 rows - we would then have to insert 1000 rows into the audit table.
Good point - the primary key might not be an integer. Storing them all as strings is OK.
Maybe
I was just thinking that if there's a table class called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I agree, but we don’t currently have the classic bulk delete in Piccolo Admin (we loop through # in piccolo_api.crud.endpoints.py
@apply_validators
async def delete_bulk(
self, request: Request, params: t.Optional[t.Dict[str, t.Any]] = None
) -> Response:
"""
Bulk deletes rows - query parameters are used for filtering.
"""
params = self._clean_data(params) if params else {}
split_params = self._split_params(params)
split_params_ids = split_params.ids.split(",")
try:
query: t.Union[
Select, Count, Objects, Delete
] = self.table.delete()
try:
# Serial or UUID primary keys enabled in query params
value_type = self.table._meta.primary_key.value_type
ids = [value_type(item) for item in split_params_ids]
query_ids = query.where(
self.table._meta.primary_key.is_in(ids)
)
query = self._apply_filters(query_ids, split_params)
# this is not tested but it should be work
# in ids list can be one or more items
deleted_rows_map = {"deleted_ids": ids}
try:
if self.audit_log_table:
await self.audit_log_table.record_delete_action(
self.table,
deleted_rows_pk=deleted_rows_map,
user_id=request.user.user_id,
)
except Exception as exception:
logger.log(msg=f"{exception}", level=logging.WARNING)
except ValueError:
query = self._apply_filters(query, split_params)
except MalformedQuery as exception:
return Response(str(exception), status_code=400)
await query.run()
return Response(status_code=204)
# in piccolo_api.audit_logs.tables.py
class AuditLog(Table):
...
deleted_rows = JSON()
...
@classmethod
async def record_delete_action(
cls,
table: t.Type[Table],
deleted_rows_pk: t.Dict[str, t.Any],
user_id: int,
):
result = cls(
action_type=cls.ActionType.deleting,
action_user=cls.get_user_username(user_id),
deleted_rows=deleted_rows_pk,
)
await result.save().run() But for now we don't have that option because we're looping through the // Maps to a list, so we can also use it for a bulk delete endpoint.
{
"deleted_ids": [1, 2]
} We should stick to what we have at the moment, and later we can always change (when we have
I think |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,6 +149,7 @@ def __init__( | |
schema_extra: t.Optional[t.Dict[str, t.Any]] = None, | ||
max_joins: int = 0, | ||
hooks: t.Optional[t.List[Hook]] = None, | ||
audit_log_table: t.Optional[t.Type[AuditLog]] = None, | ||
) -> None: | ||
""" | ||
:param table: | ||
|
@@ -218,6 +219,7 @@ def __init__( | |
} | ||
else: | ||
self._hook_map = None # type: ignore | ||
self.audit_log_table = audit_log_table or AuditLog | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no need to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But without that we cannot add records to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if people don't register the In def create_admin(..., audit_log_table: AuditLog = AuditLog, enable_auditing: t.List[t.Type[Table]] = []) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Later today I will try to do that in Piccolo Admin. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right about this. We need to add a few lines of code to Piccolo Admin and everything works great and we can choose which table we want audit logs for in future |
||
|
||
schema_extra = schema_extra if isinstance(schema_extra, dict) else {} | ||
self.visible_fields_options = get_visible_fields_options( | ||
|
@@ -335,6 +337,22 @@ def pydantic_model_plural( | |
rows=(t.List[base_model], None), | ||
) | ||
|
||
def get_single_row( | ||
self, | ||
table: t.Type[Table], | ||
row_id: t.Union[str, uuid.UUID, int], | ||
) -> t.Dict[str, t.Any]: | ||
""" | ||
Return a single row. | ||
""" | ||
row = ( | ||
self.table.select(exclude_secrets=self.exclude_secrets) | ||
.where(self.table._meta.primary_key == row_id) | ||
.first() | ||
.run_sync() | ||
) | ||
return row | ||
|
||
@apply_validators | ||
async def get_schema(self, request: Request) -> JSONResponse: | ||
""" | ||
|
@@ -813,13 +831,17 @@ async def post_single( | |
row = await execute_post_hooks( | ||
hooks=self._hook_map, hook_type=HookType.pre_save, row=row | ||
) | ||
try: | ||
await AuditLog.post_save_action( | ||
self.table, user_id=request.user.user_id | ||
) | ||
except AssertionError: | ||
pass | ||
response = await row.save().run() | ||
new_row_id = list(response[0].values()) | ||
try: | ||
if self.audit_log_table: | ||
await self.audit_log_table.record_save_action( | ||
self.table, | ||
user_id=request.user.user_id, | ||
new_row_id=new_row_id[0], | ||
) | ||
except Exception as exception: | ||
logger.log(msg=f"{exception}", level=logging.WARNING) | ||
json = dump_json(response) | ||
# Returns the id of the inserted row. | ||
return CustomJSONResponse(json, status_code=201) | ||
|
@@ -1064,21 +1086,24 @@ async def patch_single( | |
) | ||
|
||
try: | ||
old_row = self.get_single_row(cls, row_id) | ||
await cls.update(values).where( | ||
cls._meta.primary_key == row_id | ||
).run() | ||
new_row = self.get_single_row(cls, row_id) | ||
changes_in_row = { | ||
k: v for k, v in new_row.items() - old_row.items() | ||
} | ||
try: | ||
await AuditLog.post_patch_action( | ||
cls, row_id=row_id, user_id=request.user.user_id | ||
) | ||
except AssertionError: | ||
pass | ||
new_row = ( | ||
await cls.select(exclude_secrets=self.exclude_secrets) | ||
.where(cls._meta.primary_key == row_id) | ||
.first() | ||
.run() | ||
) | ||
if self.audit_log_table: | ||
await self.audit_log_table.record_patch_action( | ||
cls, | ||
row_id=row_id, | ||
user_id=request.user.user_id, | ||
changes_in_row=changes_in_row, | ||
) | ||
except Exception as exception: | ||
logger.log(msg=f"{exception}", level=logging.WARNING) | ||
return CustomJSONResponse(self.pydantic_model(**new_row).json()) | ||
except ValueError: | ||
return Response("Unable to save the resource.", status_code=500) | ||
|
@@ -1103,11 +1128,12 @@ async def delete_single( | |
self.table._meta.primary_key == row_id | ||
).run() | ||
try: | ||
await AuditLog.post_delete_action( | ||
self.table, row_id=row_id, user_id=request.user.user_id | ||
) | ||
except AssertionError: | ||
pass | ||
if self.audit_log_table: | ||
await self.audit_log_table.record_delete_action( | ||
self.table, row_id=row_id, user_id=request.user.user_id | ||
) | ||
except Exception as exception: | ||
logger.log(msg=f"{exception}", level=logging.WARNING) | ||
return Response(status_code=204) | ||
except ValueError: | ||
return Response("Unable to delete the resource.", status_code=500) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one really nice thing would be recording which changes were made. There could be a JSON field which would have something like
{'name': 'bob'}
if the name field was changed to 'bob'.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea. I will try to implement it.