From 762276e2f16b56e72894e31de3e1ff7d06db3bb5 Mon Sep 17 00:00:00 2001 From: detj Date: Tue, 24 Dec 2024 07:56:08 +0530 Subject: [PATCH] chore(backend): cleanup expired resources - cleanup expired event filters - cleanup expired evnet metrics - cleanup expired span filters - cleanup expired span metrics - cleanup expired sessions - cleanup expired user defined attributes fixes #1530 Signed-off-by: detj --- backend/cleanup/cleanup/cleanup.go | 235 +++++++++++++++++++++++++---- 1 file changed, 207 insertions(+), 28 deletions(-) diff --git a/backend/cleanup/cleanup/cleanup.go b/backend/cleanup/cleanup/cleanup.go index 8210de013..f8acc3c93 100644 --- a/backend/cleanup/cleanup/cleanup.go +++ b/backend/cleanup/cleanup/cleanup.go @@ -32,18 +32,43 @@ type StaleData struct { Attachments []Attachment `json:"attachments"` } +type AppRetention struct { + AppID string `json:"app_id"` + Threshold time.Time `json:"threshold"` +} + func DeleteStaleData(ctx context.Context) { - // Delete shortened filters + // delete shortened filters deleteStaleShortenedFilters(ctx) - // Delete events and attachments - staleData, err := fetchStaleData(ctx) - + // fetch each app's retention thresholds + appRetentions, err := fetchAppRetentions(ctx) if err != nil { - fmt.Printf("Failed to fetch stale data: %v\n", err) + fmt.Printf("Failed to fetch app retentions and stale data: %v\n", err) return } + // delete event filters + deleteEventFilters(ctx, appRetentions) + + // delete event metrics + deleteEventMetrics(ctx, appRetentions) + + // delete span filters + deleteSpanFilters(ctx, appRetentions) + + // delete span metrics + deleteSpanMetrics(ctx, appRetentions) + + // delete user defined attributes + deleteUserDefAttrs(ctx, appRetentions) + + // delete sessions + deleteSessions(ctx, appRetentions) + + // delete events, spans and attachments + staleData := fetchStaleData(ctx, appRetentions) + for _, st := range staleData { // Delete attachments from object storage if len(st.Attachments) > 0 { @@ -93,7 +118,7 @@ func DeleteStaleData(ctx context.Context) { } staleDataJson, _ := json.MarshalIndent(staleData, "", " ") - fmt.Printf("Succesfully deleted stale stale data %v\n", string(staleDataJson)) + fmt.Printf("Succesfully deleted stale data %v\n", string(staleDataJson)) } func deleteStaleShortenedFilters(ctx context.Context) { @@ -110,12 +135,162 @@ func deleteStaleShortenedFilters(ctx context.Context) { fmt.Printf("Succesfully deleted stale short filters\n") } -func fetchStaleData(ctx context.Context) ([]StaleData, error) { - var staleData []StaleData +// deleteEventFilters deletes stale event filters for each +// app's retention threshold. +func deleteEventFilters(ctx context.Context, retentions []AppRetention) { + errCount := 0 + for _, retention := range retentions { + stmt := sqlf. + DeleteFrom("app_filters"). + Where("app_id = toUUID(?)", retention.AppID). + Where("end_of_month < ?", retention.Threshold) + + if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil { + errCount += 1 + fmt.Printf("Failed to delete stale event filters for app id %q: %v\n", retention.AppID, err) + stmt.Close() + continue + } + + stmt.Close() + } + + if errCount < 1 { + fmt.Println("Successfully deleted stale event filters") + } +} + +// deleteEventMetrics deletes stale event metrics for each +// app's retention threshold. +func deleteEventMetrics(ctx context.Context, retentions []AppRetention) { + errCount := 0 + for _, retention := range retentions { + stmt := sqlf. + DeleteFrom("app_metrics"). + Where("app_id = toUUID(?)", retention.AppID). + Where("timestamp < ?", retention.Threshold) + + if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil { + errCount += 1 + fmt.Printf("Failed to delete stale event metrics for app id %q: %v\n", retention.AppID, err) + stmt.Close() + continue + } + + stmt.Close() + } - // Fetch retention periods from PostgreSQL + if errCount < 1 { + fmt.Println("Successfully deleted stale event metrics") + } +} + +// deleteSpanFilters deletes stale span filters for each +// app's retention threshold. +func deleteSpanFilters(ctx context.Context, retentions []AppRetention) { + errCount := 0 + for _, retention := range retentions { + stmt := sqlf. + DeleteFrom("span_filters"). + Where("app_id = toUUID(?)", retention.AppID). + Where("end_of_month < ?", retention.Threshold) + + if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil { + errCount += 1 + fmt.Printf("Failed to delete stale span filters for app id %q: %v\n", retention.AppID, err) + stmt.Close() + continue + } + + stmt.Close() + } + + if errCount < 1 { + fmt.Println("Successfully deleted stale span filters") + } +} + +// deleteSpanMetrics deletes stale span metrics for each +// app's retention threshold. +func deleteSpanMetrics(ctx context.Context, retentions []AppRetention) { + errCount := 0 + for _, retention := range retentions { + stmt := sqlf. + DeleteFrom("span_metrics"). + Where("app_id = toUUID(?)", retention.AppID). + Where("timestamp < ?", retention.Threshold) + + if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil { + errCount += 1 + fmt.Printf("Failed to delete stale span metrics for app id %q: %v\n", retention.AppID, err) + stmt.Close() + continue + } + + stmt.Close() + } + + if errCount < 1 { + fmt.Println("Successfully deleted stale span metrics") + } +} + +// deleteUserDefAttrs deletes stale user defined attributes +// for each app's retention threshold. +func deleteUserDefAttrs(ctx context.Context, retentions []AppRetention) { + errCount := 0 + for _, retention := range retentions { + stmt := sqlf. + DeleteFrom("user_def_attrs"). + Where("app_id = toUUID(?)", retention.AppID). + Where("end_of_month < ?", retention.Threshold) + + if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil { + errCount += 1 + fmt.Printf("Failed to delete stale user defined attributes for app id %q: %v\n", retention.AppID, err) + stmt.Close() + continue + } + + stmt.Close() + } + + if errCount < 1 { + fmt.Println("Successfully deleted stale user defined attributes") + } +} + +// deleteSessions deletes stale sessions for each +// app's retention threshold. +func deleteSessions(ctx context.Context, retentions []AppRetention) { + errCount := 0 + for _, retention := range retentions { + stmt := sqlf. + DeleteFrom("sessions"). + Where("app_id = toUUID(?)", retention.AppID). + Where("first_event_timestamp < ?", retention.Threshold) + + if err := server.Server.ChPool.Exec(ctx, stmt.String(), stmt.Args()...); err != nil { + errCount += 1 + fmt.Printf("Failed to delete stale sessions for app id %q: %v\n", retention.AppID, err) + stmt.Close() + continue + } + + stmt.Close() + } + + if errCount < 1 { + fmt.Println("Successfully deleted stale sessions") + } +} + +// fetchAppRetentions fetches retention period +// for each app. +func fetchAppRetentions(ctx context.Context) (retentions []AppRetention, err error) { + // Fetch retention periods for each app stmt := sqlf.PostgreSQL. - From("public.app_settings"). + From("app_settings"). Select("app_id"). Select("retention_period") @@ -123,27 +298,35 @@ func fetchStaleData(ctx context.Context) ([]StaleData, error) { rows, err := server.Server.PgPool.Query(ctx, stmt.String(), stmt.Args()...) if err != nil { - return nil, err + return } - var appID string - var retentionPeriod int - - // For each app_id and retention_period, fetch stale data for rows.Next() { - if err := rows.Scan(&appID, &retentionPeriod); err != nil { + var retention AppRetention + var period int + + if err := rows.Scan(&retention.AppID, &period); err != nil { fmt.Printf("Failed to scan row: %v\n", err) continue } - retentionDate := time.Now().UTC().AddDate(0, 0, -retentionPeriod) + retention.Threshold = time.Now().UTC().AddDate(0, 0, -period) + retentions = append(retentions, retention) + } + + err = rows.Err() + + return +} +func fetchStaleData(ctx context.Context, retentions []AppRetention) (staleData []StaleData) { + for _, retention := range retentions { // Fetch stale events from ClickHouse fetchEventsStmt := sqlf.Select("id"). Select("attachments"). From("default.events"). - Where("app_id = ?", appID). - Where("timestamp < ?", retentionDate) + Where("app_id = toUUID(?)", retention.AppID). + Where("timestamp < ?", retention.Threshold) eventRows, err := server.Server.ChPool.Query(ctx, fetchEventsStmt.String(), fetchEventsStmt.Args()...) if err != nil { @@ -179,8 +362,8 @@ func fetchStaleData(ctx context.Context) ([]StaleData, error) { // Fetch stale spans from ClickHouse fetchSpansStmt := sqlf.Select("span_id"). From("spans"). - Where("app_id = ?", appID). - Where("start_time < ?", retentionDate) + Where("app_id = toUUID(?)", retention.AppID). + Where("start_time < ?", retention.Threshold) spanRows, err := server.Server.ChPool.Query(ctx, fetchSpansStmt.String(), fetchSpansStmt.Args()...) if err != nil { @@ -200,19 +383,15 @@ func fetchStaleData(ctx context.Context) ([]StaleData, error) { } staleData = append(staleData, StaleData{ - AppID: appID, - RetentionDate: retentionDate, + AppID: retention.AppID, + RetentionDate: retention.Threshold, EventIDs: staleEventIDs, SpanIDs: staleSpanIDs, Attachments: staleAttachments, }) } - if err := rows.Err(); err != nil { - return nil, err - } else { - return staleData, nil - } + return } func deleteAttachments(ctx context.Context, staleData StaleData) (err error) {