Skip to content

Commit

Permalink
chore(backend): cleanup expired resources
Browse files Browse the repository at this point in the history
- cleanup expired event filters
- cleanup expired evnet metrics
- cleanup expired span filters
- cleanup expired span metrics
- cleanup expired sessions
- cleanup expired user defined attributes

fixes #1530

Signed-off-by: detj <[email protected]>
  • Loading branch information
detj committed Dec 24, 2024
1 parent cd54d5e commit 762276e
Showing 1 changed file with 207 additions and 28 deletions.
235 changes: 207 additions & 28 deletions backend/cleanup/cleanup/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,43 @@ type StaleData struct {
Attachments []Attachment `json:"attachments"`
}

type AppRetention struct {
AppID string `json:"app_id"`
Threshold time.Time `json:"threshold"`
}

func DeleteStaleData(ctx context.Context) {
// Delete shortened filters
// delete shortened filters
deleteStaleShortenedFilters(ctx)

// Delete events and attachments
staleData, err := fetchStaleData(ctx)

// fetch each app's retention thresholds
appRetentions, err := fetchAppRetentions(ctx)
if err != nil {
fmt.Printf("Failed to fetch stale data: %v\n", err)
fmt.Printf("Failed to fetch app retentions and stale data: %v\n", err)
return
}

// delete event filters
deleteEventFilters(ctx, appRetentions)

// delete event metrics
deleteEventMetrics(ctx, appRetentions)

// delete span filters
deleteSpanFilters(ctx, appRetentions)

// delete span metrics
deleteSpanMetrics(ctx, appRetentions)

// delete user defined attributes
deleteUserDefAttrs(ctx, appRetentions)

// delete sessions
deleteSessions(ctx, appRetentions)

// delete events, spans and attachments
staleData := fetchStaleData(ctx, appRetentions)

for _, st := range staleData {
// Delete attachments from object storage
if len(st.Attachments) > 0 {
Expand Down Expand Up @@ -93,7 +118,7 @@ func DeleteStaleData(ctx context.Context) {
}

staleDataJson, _ := json.MarshalIndent(staleData, "", " ")
fmt.Printf("Succesfully deleted stale stale data %v\n", string(staleDataJson))
fmt.Printf("Succesfully deleted stale data %v\n", string(staleDataJson))
}

func deleteStaleShortenedFilters(ctx context.Context) {
Expand All @@ -110,40 +135,198 @@ func deleteStaleShortenedFilters(ctx context.Context) {
fmt.Printf("Succesfully deleted stale short filters\n")
}

func fetchStaleData(ctx context.Context) ([]StaleData, error) {
var staleData []StaleData
// deleteEventFilters deletes stale event filters for each
// app's retention threshold.
func deleteEventFilters(ctx context.Context, retentions []AppRetention) {
errCount := 0
for _, retention := range retentions {
stmt := sqlf.
DeleteFrom("app_filters").
Where("app_id = toUUID(?)", retention.AppID).
Where("end_of_month < ?", retention.Threshold)

if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil {
errCount += 1
fmt.Printf("Failed to delete stale event filters for app id %q: %v\n", retention.AppID, err)
stmt.Close()
continue
}

stmt.Close()
}

if errCount < 1 {
fmt.Println("Successfully deleted stale event filters")
}
}

// deleteEventMetrics deletes stale event metrics for each
// app's retention threshold.
func deleteEventMetrics(ctx context.Context, retentions []AppRetention) {
errCount := 0
for _, retention := range retentions {
stmt := sqlf.
DeleteFrom("app_metrics").
Where("app_id = toUUID(?)", retention.AppID).
Where("timestamp < ?", retention.Threshold)

if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil {
errCount += 1
fmt.Printf("Failed to delete stale event metrics for app id %q: %v\n", retention.AppID, err)
stmt.Close()
continue
}

stmt.Close()
}

// Fetch retention periods from PostgreSQL
if errCount < 1 {
fmt.Println("Successfully deleted stale event metrics")
}
}

// deleteSpanFilters deletes stale span filters for each
// app's retention threshold.
func deleteSpanFilters(ctx context.Context, retentions []AppRetention) {
errCount := 0
for _, retention := range retentions {
stmt := sqlf.
DeleteFrom("span_filters").
Where("app_id = toUUID(?)", retention.AppID).
Where("end_of_month < ?", retention.Threshold)

if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil {
errCount += 1
fmt.Printf("Failed to delete stale span filters for app id %q: %v\n", retention.AppID, err)
stmt.Close()
continue
}

stmt.Close()
}

if errCount < 1 {
fmt.Println("Successfully deleted stale span filters")
}
}

// deleteSpanMetrics deletes stale span metrics for each
// app's retention threshold.
func deleteSpanMetrics(ctx context.Context, retentions []AppRetention) {
errCount := 0
for _, retention := range retentions {
stmt := sqlf.
DeleteFrom("span_metrics").
Where("app_id = toUUID(?)", retention.AppID).
Where("timestamp < ?", retention.Threshold)

if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil {
errCount += 1
fmt.Printf("Failed to delete stale span metrics for app id %q: %v\n", retention.AppID, err)
stmt.Close()
continue
}

stmt.Close()
}

if errCount < 1 {
fmt.Println("Successfully deleted stale span metrics")
}
}

// deleteUserDefAttrs deletes stale user defined attributes
// for each app's retention threshold.
func deleteUserDefAttrs(ctx context.Context, retentions []AppRetention) {
errCount := 0
for _, retention := range retentions {
stmt := sqlf.
DeleteFrom("user_def_attrs").
Where("app_id = toUUID(?)", retention.AppID).
Where("end_of_month < ?", retention.Threshold)

if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil {
errCount += 1
fmt.Printf("Failed to delete stale user defined attributes for app id %q: %v\n", retention.AppID, err)
stmt.Close()
continue
}

stmt.Close()
}

if errCount < 1 {
fmt.Println("Successfully deleted stale user defined attributes")
}
}

// deleteSessions deletes stale sessions for each
// app's retention threshold.
func deleteSessions(ctx context.Context, retentions []AppRetention) {
errCount := 0
for _, retention := range retentions {
stmt := sqlf.
DeleteFrom("sessions").
Where("app_id = toUUID(?)", retention.AppID).
Where("first_event_timestamp < ?", retention.Threshold)

if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil {
errCount += 1
fmt.Printf("Failed to delete stale sessions for app id %q: %v\n", retention.AppID, err)
stmt.Close()
continue
}

stmt.Close()
}

if errCount < 1 {
fmt.Println("Successfully deleted stale sessions")
}
}

// fetchAppRetentions fetches retention period
// for each app.
func fetchAppRetentions(ctx context.Context) (retentions []AppRetention, err error) {
// Fetch retention periods for each app
stmt := sqlf.PostgreSQL.
From("public.app_settings").
From("app_settings").
Select("app_id").
Select("retention_period")

defer stmt.Close()

rows, err := server.Server.PgPool.Query(ctx, stmt.String(), stmt.Args()...)
if err != nil {
return nil, err
return
}

var appID string
var retentionPeriod int

// For each app_id and retention_period, fetch stale data
for rows.Next() {
if err := rows.Scan(&appID, &retentionPeriod); err != nil {
var retention AppRetention
var period int

if err := rows.Scan(&retention.AppID, &period); err != nil {
fmt.Printf("Failed to scan row: %v\n", err)
continue
}

retentionDate := time.Now().UTC().AddDate(0, 0, -retentionPeriod)
retention.Threshold = time.Now().UTC().AddDate(0, 0, -period)
retentions = append(retentions, retention)
}

err = rows.Err()

return
}

func fetchStaleData(ctx context.Context, retentions []AppRetention) (staleData []StaleData) {
for _, retention := range retentions {
// Fetch stale events from ClickHouse
fetchEventsStmt := sqlf.Select("id").
Select("attachments").
From("default.events").
Where("app_id = ?", appID).
Where("timestamp < ?", retentionDate)
Where("app_id = toUUID(?)", retention.AppID).
Where("timestamp < ?", retention.Threshold)

eventRows, err := server.Server.ChPool.Query(ctx, fetchEventsStmt.String(), fetchEventsStmt.Args()...)
if err != nil {
Expand Down Expand Up @@ -179,8 +362,8 @@ func fetchStaleData(ctx context.Context) ([]StaleData, error) {
// Fetch stale spans from ClickHouse
fetchSpansStmt := sqlf.Select("span_id").
From("spans").
Where("app_id = ?", appID).
Where("start_time < ?", retentionDate)
Where("app_id = toUUID(?)", retention.AppID).
Where("start_time < ?", retention.Threshold)

spanRows, err := server.Server.ChPool.Query(ctx, fetchSpansStmt.String(), fetchSpansStmt.Args()...)
if err != nil {
Expand All @@ -200,19 +383,15 @@ func fetchStaleData(ctx context.Context) ([]StaleData, error) {
}

staleData = append(staleData, StaleData{
AppID: appID,
RetentionDate: retentionDate,
AppID: retention.AppID,
RetentionDate: retention.Threshold,
EventIDs: staleEventIDs,
SpanIDs: staleSpanIDs,
Attachments: staleAttachments,
})
}

if err := rows.Err(); err != nil {
return nil, err
} else {
return staleData, nil
}
return
}

func deleteAttachments(ctx context.Context, staleData StaleData) (err error) {
Expand Down

0 comments on commit 762276e

Please sign in to comment.