Skip to content

Commit

Permalink
fix(backend): prevent duplicate ingestion of events
Browse files Browse the repository at this point in the history
- add postgres migration to add `status` column in `event_reqs` table
- prevent duplicate ingestion of events by keeping track of a `status`
column in `event_reqs` table
- send `retry-after` response header if another identical request is
being processed
- rollback gracefully to cleanup event requests in `pending` state if
request fails at any point
- use defer to end all otel spans in ingestion

fixes #1277

Signed-off-by: detj <[email protected]>
  • Loading branch information
detj committed Oct 12, 2024
1 parent a4de8bb commit 1465024
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 36 deletions.
192 changes: 156 additions & 36 deletions backend/api/measure/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"path/filepath"
"slices"
"strings"
"time"

"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/leporo/sqlf"
"go.opentelemetry.io/otel"
)
Expand All @@ -31,6 +33,37 @@ import (
// size of event request in bytes.
var maxBatchSize = 20 * 1024 * 1024

// retryAfter is the default duration an event
// request should be retried after.
const retryAfter = 60 * time.Second

// status defines the status of processing
// of an event request.
type status int

const (
// Pending represents that the event request
// is still being processed.
pending status = iota

// Done represents that the event request
// has finished processing.
done
)

// String returns a string representation of the
// status.
func (s status) String() string {
switch s {
default:
return "unknown"
case pending:
return "pending"
case done:
return "done"
}
}

type attachment struct {
id uuid.UUID
name string
Expand Down Expand Up @@ -227,40 +260,46 @@ func (e *eventreq) infuseInet(rawIP string) error {
return nil
}

// seen checks if the event request has been
// processed already or not.
func (e eventreq) seen(ctx context.Context) (seen bool, err error) {
stmt := sqlf.PostgreSQL.
From(`public.event_reqs`).
Select("1").
// getStatus gets the status of an event request.
func (e eventreq) getStatus(ctx context.Context) (s *status, err error) {
stmt := sqlf.PostgreSQL.From(`event_reqs`).
Select("status").
Where("id = ? and app_id = ?", e.id, e.appId)

defer stmt.Close()

if err = server.Server.PgPool.QueryRow(ctx, stmt.String(), stmt.Args()...).Scan(nil); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return false, nil
}
return
}

seen = true
err = server.Server.PgPool.QueryRow(ctx, stmt.String(), stmt.Args()...).Scan(&s)

return
}

// save saves the processed event request to persistent
// start inserts a new pending event request in persistent
// storage.
func (e eventreq) save(ctx context.Context, tx *pgx.Tx) (err error) {
func (e eventreq) start(ctx context.Context) (err error) {
stmt := sqlf.PostgreSQL.
InsertInto(`public.event_reqs`).
Set(`id`, e.id).
Set(`app_id`, e.appId).
Set(`status`, pending)

defer stmt.Close()

_, err = server.Server.PgPool.Exec(ctx, stmt.String(), stmt.Args()...)

return
}

// end saves the event request batch marking its
// status as "done".
func (e eventreq) end(ctx context.Context, tx *pgx.Tx) (err error) {
stmt := sqlf.PostgreSQL.Update(`public.event_reqs`).
Set(`event_count`, len(e.events)).
Set(`attachment_count`, len(e.attachments)).
Set(`session_count`, e.sessionCount()).
Set(`bytes_in`, e.size).
Set(`symbolication_attempts_count`, e.symbolicationAttempted)
Set(`symbolication_attempts_count`, e.symbolicationAttempted).
Set(`status`, done).
Where("id = ? and app_id = ?", e.id, e.appId)

defer stmt.Close()

Expand All @@ -269,6 +308,35 @@ func (e eventreq) save(ctx context.Context, tx *pgx.Tx) (err error) {
return
}

// cleanup cleans up the dangling event request in
// pending state, if any.
func (e eventreq) cleanup(ctx context.Context) (err error) {
s, err := e.getStatus(ctx)
if errors.Is(err, pgx.ErrNoRows) {
return nil
}

if err != nil {
return
}

if s == nil {
return
}

switch *s {
case pending:
// remove event request in pending state
stmt := sqlf.PostgreSQL.DeleteFrom(`event_reqs`).Where("id = ? and app_id = ? and status = ?", e.id, e.appId, pending)

defer stmt.Close()

_, err = server.Server.PgPool.Exec(ctx, stmt.String(), stmt.Args()...)
}

return
}

// hasUnhandledExceptions returns true if event payload
// contains unhandled exceptions.
func (e eventreq) hasUnhandledExceptions() bool {
Expand Down Expand Up @@ -2086,20 +2154,72 @@ func PutEvents(c *gin.Context) {
return
}

if seen, err := eventReq.seen(ctx); err != nil {
msg := `failed to check existing event request`
fmt.Println(msg, err.Error())
// there's a possiblity that a previous event request is already
// in progress or was processed.
//
// if it's in progress, we don't know whether it will succeed or
// not, so we ask the client to retry after sometime.
//
// if it was processed, tell the client that this event request
// was seen previously and we ignore this request.
rs, err := eventReq.getStatus(ctx)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
msg := "failed to check status of event request"
fmt.Println(msg, err)
c.JSON(http.StatusInternalServerError, gin.H{
"error": msg,
})
return
} else if seen {
c.JSON(http.StatusAccepted, gin.H{
"ok": "accepted, known event request",
}

if rs != nil {
switch *rs {
case pending:
durStr := fmt.Sprintf("%d", int64(retryAfter.Seconds()))
c.Header("Retry-After", durStr)
c.JSON(http.StatusAccepted, gin.H{
"ok": fmt.Sprintf("previous accepted request %q is in progress, retry after %s seconds", eventReq.id, durStr),
})
return
case done:
c.JSON(http.StatusAccepted, gin.H{
"ok": "accepted, known event request",
})
return
}
}

// start by recording the event request so that
// we can deterministically prevent duplication.
if err := eventReq.start(ctx); err != nil {
// detect primary key violations
if pgErr, ok := err.(*pgconn.PgError); ok {
if pgErr.Code == "23505" {
durStr := fmt.Sprintf("%d", int64(retryAfter.Seconds()))
c.Header("Retry-After", durStr)
c.JSON(http.StatusAccepted, gin.H{
"ok": fmt.Sprintf("previous accepted request %q is in progress, retry after %s seconds", eventReq.id, durStr),
})
return
}
}

msg := "failed to start ingestion"
fmt.Println(msg, err)
c.JSON(http.StatusInternalServerError, gin.H{
"error": msg,
})
return
}

// cleanup at the end
defer func() {
if err := eventReq.cleanup(ctx); err != nil {
msg := "failed to cleanup event request"
fmt.Println(msg, err)
}
}()

if err := eventReq.infuseInet(c.ClientIP()); err != nil {
msg := fmt.Sprintf(`failed to lookup country info for IP: %q`, c.ClientIP())
fmt.Println(msg, err)
Expand Down Expand Up @@ -2134,6 +2254,8 @@ func PutEvents(c *gin.Context) {
symbolicationTracer := otel.Tracer("symbolication-tracer")
_, symbolicationSpan := symbolicationTracer.Start(ctx, "symbolicate-events")

defer symbolicationSpan.End()

for i := range batches {
// If symoblication fails for whole batch, continue
if err := symbolicator.Symbolicate(ctx, batches[i]); err != nil {
Expand Down Expand Up @@ -2162,8 +2284,6 @@ func PutEvents(c *gin.Context) {
}
}

symbolicationSpan.End()

eventReq.bumpSymbolication()
}

Expand All @@ -2172,13 +2292,14 @@ func PutEvents(c *gin.Context) {
uploadAttachmentsTracer := otel.Tracer("upload-attachments-tracer")
_, uploadAttachmentSpan := uploadAttachmentsTracer.Start(ctx, "upload-attachments")

defer uploadAttachmentSpan.End()

if err := eventReq.uploadAttachments(); err != nil {
msg := `failed to upload attachments`
fmt.Println(msg, err)
c.JSON(http.StatusInternalServerError, gin.H{
"error": msg,
})
uploadAttachmentSpan.End()
return
}

Expand All @@ -2202,11 +2323,12 @@ func PutEvents(c *gin.Context) {
eventReq.events[i].Attachments[j].Key = attachment.key
}
}

uploadAttachmentSpan.End()
}

tx, err := server.Server.PgPool.Begin(ctx)
tx, err := server.Server.PgPool.BeginTx(ctx, pgx.TxOptions{
IsoLevel: pgx.ReadCommitted,
})

if err != nil {
msg := `failed to ingest events, failed to acquire transaction`
fmt.Println(msg, err)
Expand All @@ -2231,27 +2353,29 @@ func PutEvents(c *gin.Context) {
bucketUnhandledExceptionsTracer := otel.Tracer("bucket-unhandled-exceptions-tracer")
_, bucketUnhandledExceptionsSpan := bucketUnhandledExceptionsTracer.Start(ctx, "bucket-unhandled-exceptions")

defer bucketUnhandledExceptionsSpan.End()

if err := eventReq.bucketUnhandledExceptions(ctx, &tx); err != nil {
msg := `failed to bucket unhandled exceptions`
fmt.Println(msg, err)
c.JSON(http.StatusInternalServerError, gin.H{
"error": msg,
})
bucketUnhandledExceptionsSpan.End()
return
}

// start span to trace bucketing ANRs
bucketAnrsTracer := otel.Tracer("bucket-anrs-tracer")
_, bucketAnrsSpan := bucketAnrsTracer.Start(ctx, "bucket-anrs-exceptions")

defer bucketAnrsSpan.End()

if err := eventReq.bucketANRs(ctx, &tx); err != nil {
msg := `failed to bucket anrs`
fmt.Println(msg, err)
c.JSON(http.StatusInternalServerError, gin.H{
"error": msg,
})
bucketAnrsSpan.End()
return
}

Expand All @@ -2271,7 +2395,7 @@ func PutEvents(c *gin.Context) {
}
}

if err := eventReq.save(ctx, &tx); err != nil {
if err := eventReq.end(ctx, &tx); err != nil {
msg := `failed to save event request`
fmt.Println(msg, err)
c.JSON(http.StatusInternalServerError, gin.H{
Expand All @@ -2286,12 +2410,8 @@ func PutEvents(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{
"error": msg,
})
bucketUnhandledExceptionsSpan.End()
bucketAnrsSpan.End()
return
}
bucketUnhandledExceptionsSpan.End()
bucketAnrsSpan.End()

c.JSON(http.StatusAccepted, gin.H{"ok": "accepted"})
}
12 changes: 12 additions & 0 deletions self-host/postgres/20241011140121_alter_event_reqs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- migrate:up
alter table if exists event_reqs
add status int default 0;

comment on column event_reqs.status is 'status of event request: 0 is pending, 1 is done';

update event_reqs set status = 1;

-- migrate:down
alter table if exists event_reqs
drop if exists status;

0 comments on commit 1465024

Please sign in to comment.