Skip to content

Commit

Permalink
fix(flakes): process uploads only once (#942)
Browse files Browse the repository at this point in the history
  • Loading branch information
joseph-sentry authored Dec 16, 2024
1 parent 536c4e0 commit e21eb33
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 276 deletions.
264 changes: 156 additions & 108 deletions tasks/process_flakes.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import logging
from typing import Any

from django.db import transaction as django_transaction
from django.db.models import Q
from shared.celery_config import process_flakes_task_name
from shared.django_apps.reports.models import DailyTestRollup, Flake, TestInstance
from shared.django_apps.reports.models import (
CommitReport,
DailyTestRollup,
Flake,
ReportSession,
TestInstance,
)

from app import celery_app
from helpers.metrics import metrics
from tasks.base import BaseCodecovTask

log = logging.getLogger(__name__)


FlakeDict = dict[tuple[str, int], Flake]

FLAKE_EXPIRY_COUNT = 30


Expand All @@ -23,76 +28,124 @@ class ProcessFlakesTask(BaseCodecovTask, name=process_flakes_task_name):

def run_impl(
self,
_db_session,
_db_session: Any,
*,
repo_id,
commit_id_list,
branch,
**kwargs,
repo_id: int,
commit_id: str,
**kwargs: Any,
):
repo_id = int(repo_id)
"""
This task wants to iterate through uploads for a given commit that have yet to be
"flake processed".
For each of those uploads it wants to iterate through its test instances and
update existing flakes' count, recent_passes_count, fail_count, and end_date fields
depending on whether the test instance passed or failed.
For each upload it wants to keep track of newly created flakes and keep those in a separate
collection than the existing flakes, so at the end it can bulk create the new flakes and
bulk update the existing flakes.
It also wants to increment the flaky_fail_count of the relevant DailyTestRollup when it creates
a new flake so it keeps track of those changes and bulk updates those as well.
When it's done with an upload it merges the new flakes dictionary into the existing flakes dictionary
and then clears the new flakes dictionary so the following upload considers the flakes created during the previous
iteration as existing.
"""
log.info(
"Received process flakes task",
extra=dict(repoid=repo_id, commit=commit_id_list),
extra=dict(repoid=repo_id, commit=commit_id),
)

with metrics.timer("process_flakes"):
flake_dict = generate_flake_dict(repo_id)
uploads = ReportSession.objects.filter(
report__report_type=CommitReport.ReportType.TEST_RESULTS.value,
report__commit__commitid=commit_id,
state="processed",
).all()

curr_flakes = fetch_curr_flakes(repo_id)
new_flakes: dict[str, Flake] = dict()

rollups_to_update: list[DailyTestRollup] = []

flaky_tests = list(curr_flakes.keys())

for upload in uploads:
test_instances = get_test_instances(upload, flaky_tests)
for test_instance in test_instances:
if test_instance.outcome == TestInstance.Outcome.PASS.value:
flake = new_flakes.get(test_instance.test_id) or curr_flakes.get(
test_instance.test_id
)
if flake is not None:
update_flake(flake, test_instance)
elif test_instance.outcome in (
TestInstance.Outcome.FAILURE.value,
TestInstance.Outcome.ERROR.value,
):
flake = new_flakes.get(test_instance.test_id) or curr_flakes.get(
test_instance.test_id
)
if flake:
update_flake(flake, test_instance)
else:
flake, rollup = create_flake(test_instance, repo_id)

new_flakes[test_instance.test_id] = flake

if rollup:
rollups_to_update.append(rollup)

if rollups_to_update:
DailyTestRollup.objects.bulk_update(
rollups_to_update,
["flaky_fail_count"],
)

merge_flake_dict = {}

if new_flakes:
flakes_to_merge = Flake.objects.bulk_create(new_flakes.values())
merge_flake_dict: dict[str, Flake] = {
flake.test_id: flake for flake in flakes_to_merge
}

Flake.objects.bulk_update(
curr_flakes.values(),
[
"count",
"fail_count",
"recent_passes_count",
"end_date",
],
)

flaky_tests: list[str] = [
test_id for (test_id, _) in list(flake_dict.keys())
]
curr_flakes = {**merge_flake_dict, **curr_flakes}

for commit_id in commit_id_list:
test_instances = get_test_instances(
commit_id, repo_id, branch, flaky_tests
)
for test_instance in test_instances:
if test_instance.outcome == TestInstance.Outcome.PASS.value:
flake = flake_dict.get(
(test_instance.test_id, test_instance.reduced_error_id)
)
if flake is not None:
update_passed_flakes(test_instance, flake)
elif test_instance.outcome in (
TestInstance.Outcome.FAILURE.value,
TestInstance.Outcome.ERROR.value,
):
flake = flake_dict.get(
(test_instance.test_id, test_instance.reduced_error_id)
)
upserted_flake = upsert_failed_flake(
test_instance, flake, repo_id
)
if flake is None:
flake_dict[
(
upserted_flake.test_id,
upserted_flake.reduced_error_id,
)
] = upserted_flake
new_flakes.clear()

upload.state = "flake_processed"
upload.save()
django_transaction.commit()

log.info(
"Successfully processed flakes",
extra=dict(repoid=repo_id, commit=commit_id_list),
extra=dict(repoid=repo_id, commit=commit_id),
)

return {"successful": True}


def get_test_instances(
commit_id: str,
repo_id: int,
branch: str,
upload: ReportSession,
flaky_tests: list[str],
) -> list[TestInstance]:
# get test instances on this repo commit branch combination that either:
# get test instances on this upload that either:
# - failed
# - passed but belong to an already flaky test

repo_commit_branch_filter = (
Q(commitid=commit_id) & Q(repoid=repo_id) & Q(branch=branch)
)
upload_filter = Q(upload_id=upload.id)
test_failed_filter = Q(outcome=TestInstance.Outcome.ERROR.value) | Q(
outcome=TestInstance.Outcome.FAILURE.value
)
Expand All @@ -101,78 +154,73 @@ def get_test_instances(
)
test_instances = list(
TestInstance.objects.filter(
repo_commit_branch_filter
& (test_failed_filter | test_passed_but_flaky_filter)
upload_filter & (test_failed_filter | test_passed_but_flaky_filter)
)
.select_related("test")
.all()
)
return test_instances


def generate_flake_dict(repo_id: int) -> FlakeDict:
def fetch_curr_flakes(repo_id: int) -> dict[str, Flake]:
flakes = Flake.objects.filter(repository_id=repo_id, end_date__isnull=True).all()
flake_dict = dict()
for flake in flakes:
flake_dict[(flake.test_id, flake.reduced_error_id)] = flake
return flake_dict
return {flake.test_id: flake for flake in flakes}


def update_passed_flakes(test_instance: TestInstance, flake: Flake) -> None:
flake.count += 1
flake.recent_passes_count += 1
def create_flake(
test_instance: TestInstance,
repo_id: int,
) -> tuple[Flake, DailyTestRollup | None]:
# retroactively mark newly caught flake as flaky failure in its rollup
rollup = DailyTestRollup.objects.filter(
repoid=repo_id,
date=test_instance.created_at.date(),
branch=test_instance.branch,
test_id=test_instance.test_id,
).first()

if rollup:
rollup.flaky_fail_count += 1
else:
log.warning(
"Could not find rollup when trying to update its flaky fail count",
extra=dict(
repoid=repo_id,
testid=test_instance.test_id,
branch=test_instance.branch,
date=test_instance.created_at.date(),
),
)

if flake.recent_passes_count == FLAKE_EXPIRY_COUNT:
flake.end_date = test_instance.created_at
f = Flake(
repository_id=repo_id,
test=test_instance.test,
reduced_error=None,
count=1,
fail_count=1,
start_date=test_instance.created_at,
recent_passes_count=0,
)

flake.save()
return f, rollup


def upsert_failed_flake(
def update_flake(
flake: Flake,
test_instance: TestInstance,
flake: Flake | None,
repo_id: int,
) -> Flake:
if flake is None:
flake = Flake(
repository_id=repo_id,
test=test_instance.test,
reduced_error=test_instance.reduced_error_id,
count=1,
fail_count=1,
start_date=test_instance.created_at,
recent_passes_count=0,
)
flake.save()

# retroactively mark newly caught flake as flaky failure in its rollup
rollup = DailyTestRollup.objects.filter(
repoid=repo_id,
date=test_instance.created_at.date(),
branch=test_instance.branch,
test_id=test_instance.test_id,
).first()

if rollup:
rollup.flaky_fail_count += 1
rollup.save()
else:
log.warning(
"Could not find rollup when trying to update its flaky fail count",
extra=dict(
repoid=repo_id,
testid=test_instance.test_id,
branch=test_instance.branch,
date=test_instance.created_at.date(),
),
)
else:
flake.count += 1
flake.fail_count += 1
flake.recent_passes_count = 0
flake.save()
) -> None:
flake.count += 1

return flake
match test_instance.outcome:
case TestInstance.Outcome.PASS.value:
flake.recent_passes_count += 1
if flake.recent_passes_count == FLAKE_EXPIRY_COUNT:
flake.end_date = test_instance.created_at
case TestInstance.Outcome.FAILURE.value | TestInstance.Outcome.ERROR.value:
flake.fail_count += 1
flake.recent_passes_count = 0
case _:
pass


RegisteredProcessFlakesTask = celery_app.register_task(ProcessFlakesTask())
Expand Down
6 changes: 1 addition & 5 deletions tasks/sync_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ def update_pull_commits(
db_session,
repository,
pull.head,
pull_dict["head"]["branch"],
current_yaml,
)

Expand Down Expand Up @@ -534,7 +533,6 @@ def trigger_process_flakes(
db_session: sqlalchemy.orm.Session,
repository: Repository,
pull_head: str,
branch: str,
current_yaml: UserYaml,
):
if (
Expand All @@ -543,9 +541,7 @@ def trigger_process_flakes(
> 0
):
self.app.tasks[process_flakes_task_name].apply_async(
kwargs=dict(
repo_id=repository.repoid, commit_id_list=[pull_head], branch=branch
)
kwargs=dict(repo_id=repository.repoid, commit_id=pull_head)
)

def trigger_ai_pr_review(self, enriched_pull: EnrichedPull, current_yaml: UserYaml):
Expand Down
14 changes: 7 additions & 7 deletions tasks/test_results_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ def process_impl_within_lock(
assert commit, "commit not found"
repo = commit.repository

if should_do_flaky_detection(repo, commit_yaml) and (
commit.merged is True or commit.branch == repo.branch
):
self.app.tasks[process_flakes_task_name].apply_async(
kwargs=dict(
repo_id=repoid, commit_id_list=[commit.commitid], branch=repo.branch
if should_do_flaky_detection(repo, commit_yaml):
if commit.merged is True or commit.branch == repo.branch:
self.app.tasks[process_flakes_task_name].apply_async(
kwargs=dict(
repo_id=repoid,
commit_id=commit.commitid,
)
)
)

if commit.branch is not None:
self.app.tasks[cache_test_rollups_task_name].apply_async(
Expand Down
Loading

0 comments on commit e21eb33

Please sign in to comment.