Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental Updates: honor events before changed_at #237

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions internal/config/incremental_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ type IncrementalConfigurableInitAndValidatable interface {
IncrementalInitAndValidate() error
}

// incrementalFetchBacklog is an offset for past changes to consider for incrementalFetch.
//
// It may happen that updates do not arrive in the database in order of their changed_at value. As this value is set by
// Icinga Notifications Web on some webserver, there might be a delta between setting the timestamp and the entry
// getting committed in the database. This delta may not be evenly distributed, especially when the web part processes
// multiple changes concurrently.
//
// Fetched elements can be compared based on their changed_at value and be ignored if it has not changed.
const incrementalFetchBacklog = 10 * time.Minute

// incrementalFetch queries all recently changed elements of BaseT and stores them in changeConfigSetField.
//
// The RuntimeConfig.configChangeTimestamps map contains the last known timestamp for each BaseT table. Only those
Expand Down Expand Up @@ -62,9 +72,12 @@ func incrementalFetch[
stmtArgs []any
)
if hasChangedAt {
stmtLogger = stmtLogger.With(zap.Time("changed_at", changedAt.Time()))
changedAtBacklog := types.UnixMilli(changedAt.Time().Add(-incrementalFetchBacklog))
stmtLogger = stmtLogger.With(
zap.Time("last_known_changed_at", changedAt.Time()),
zap.Time("compared_changed_at", changedAtBacklog.Time()))
stmt += ` WHERE "changed_at" > ?`
stmtArgs = []any{changedAt}
stmtArgs = []any{changedAtBacklog}
}

stmt = r.db.Rebind(stmt + ` ORDER BY "changed_at"`)
Expand Down Expand Up @@ -159,7 +172,7 @@ func incrementalApplyPending[
) {
startTime := time.Now()
tableName := database.TableName(T(nil))
countErr, countDelSkip, countDel, countUpdate, countNew := 0, 0, 0, 0, 0
countErr, countDel, countUpdate, countNew := 0, 0, 0, 0

if *configSetField == nil {
*configSetField = make(map[PK]T)
Expand Down Expand Up @@ -197,8 +210,7 @@ func incrementalApplyPending[
zap.Any("id", id))

if newT == nil && !oldExists {
countDelSkip++
logger.Warn("Skipping unknown marked as deleted configuration element")
logger.Debug("Skipping unknown element marked as deleted")
} else if newT == nil {
logger := logger.With(zap.Object("deleted", oldT))
if err := deleteAction(id, oldT); err != nil {
Expand All @@ -208,6 +220,13 @@ func incrementalApplyPending[
}
} else if oldExists {
logger := logger.With(zap.Object("old", oldT), zap.Object("update", newT))

if oldT.GetChangedAt().Time().Equal(newT.GetChangedAt().Time()) {
logger.Debugw("Skipping known element with unchanged changed_at timestamp",
zap.Time("changed_at", oldT.GetChangedAt().Time()))
continue
}

reAdd := updateFn == nil
if updateFn != nil {
if err := updateFn(oldT, newT); errors.Is(err, errRemoveAndAddInstead) {
Expand All @@ -229,6 +248,7 @@ func incrementalApplyPending[
continue
}
}

countUpdate++
logger.Debug("Updated known configuration element")
} else {
Expand All @@ -242,15 +262,14 @@ func incrementalApplyPending[
}

*changeConfigSetField = nil
appliedChanges := countErr > 0 || countDelSkip > 0 || countDel > 0 || countUpdate > 0 || countNew > 0
appliedChanges := countErr > 0 || countDel > 0 || countUpdate > 0 || countNew > 0

logger := r.logger.With(
zap.String("table", tableName),
zap.Duration("took", time.Since(startTime)))
if appliedChanges {
logger.Infow("Applied configuration updates",
zap.Int("faulty_elements", countErr),
zap.Int("deleted_unknown_elements", countDelSkip),
zap.Int("deleted_elements", countDel),
zap.Int("updated_elements", countUpdate),
zap.Int("new_elements", countNew))
Expand Down
Loading