Skip to content

Commit

Permalink
Sync process and backend plugin tweaks (#4939)
Browse files Browse the repository at this point in the history
- Split up sync requests into batches to run concurrently
- Add more logging for when API calls to Grafana fail to parse
- Call sync from backend plugin when status is called
- Lock sync from backend plugin to only run every 5 mins
- Add timer display for API call to sync to return remaining time before
sync can execute
- Remove locks from celery task since it's work is low cost and we lock
in the backend plugin anyways.
  • Loading branch information
mderynck authored Aug 28, 2024
1 parent d3f034b commit 5862053
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 54 deletions.
73 changes: 39 additions & 34 deletions engine/apps/grafana_plugin/tasks/sync_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,52 @@
from apps.grafana_plugin.helpers.gcom import get_active_instance_ids
from apps.user_management.models import Organization
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from common.utils import task_lock

logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)


SYNC_PERIOD = timezone.timedelta(minutes=4)
SYNC_BATCH_SIZE = 500


@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
def sync_organizations_v2(org_ids=None):
lock_id = "sync_organizations_v2"
with task_lock(lock_id, "main") as acquired:
if acquired:
if org_ids:
logger.debug(f"Starting with provided {len(org_ids)} org_ids")
organization_qs = Organization.objects.filter(id__in=org_ids)
else:
logger.debug("Starting with all org ids")
organization_qs = Organization.objects.all()
active_instance_ids, is_cloud_configured = get_active_instance_ids()
if is_cloud_configured:
if not active_instance_ids:
logger.warning("Did not find any active instances!")
return
else:
logger.debug(f"Found {len(active_instance_ids)} active instances")
organization_qs = organization_qs.filter(stack_id__in=active_instance_ids)

logger.info(f"Syncing {len(organization_qs)} organizations")
for idx, org in enumerate(organization_qs):
if GrafanaAPIClient.validate_grafana_token_format(org.api_token):
client = GrafanaAPIClient(api_url=org.grafana_url, api_token=org.api_token)
_, status = client.sync()
if status["status_code"] != 200:
logger.error(
f"Failed to request sync stack_slug={org.stack_slug} status_code={status['status_code']} url={status['url']} message={status['message']}"
)
if idx % 1000 == 0:
logger.info(f"{idx + 1} organizations processed")
else:
logger.info(f"Skipping stack_slug={org.stack_slug}, api_token format is invalid or not set")
def start_sync_organizations_v2():
organization_qs = Organization.objects.all()
active_instance_ids, is_cloud_configured = get_active_instance_ids()
if is_cloud_configured:
if not active_instance_ids:
logger.warning("Did not find any active instances!")
return
else:
logger.debug(f"Found {len(active_instance_ids)} active instances")
organization_qs = organization_qs.filter(stack_id__in=active_instance_ids)

logger.info(f"Found {len(organization_qs)} active organizations")
batch = []
for org in organization_qs:
if GrafanaAPIClient.validate_grafana_token_format(org.api_token):
batch.append(org.pk)
if len(batch) == SYNC_BATCH_SIZE:
sync_organizations_v2.apply_async(
(batch,),
)
batch = []
else:
logger.info(f"Issuing sync requests already in progress lock_id={lock_id}, check slow outgoing requests")
logger.info(f"Skipping stack_slug={org.stack_slug}, api_token format is invalid or not set")
if batch:
sync_organizations_v2.apply_async(
(batch,),
)


@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
def sync_organizations_v2(org_ids=None):
organization_qs = Organization.objects.filter(id__in=org_ids)
for org in organization_qs:
client = GrafanaAPIClient(api_url=org.grafana_url, api_token=org.api_token)
_, status = client.sync()
if status["status_code"] != 200:
logger.error(
f"Failed to request sync org_id={org.pk} stack_slug={org.stack_slug} status_code={status['status_code']} url={status['url']} message={status['message']}"
)
13 changes: 8 additions & 5 deletions engine/apps/grafana_plugin/tests/test_sync_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from rest_framework.test import APIClient

from apps.api.permissions import LegacyAccessControlRole
from apps.grafana_plugin.tasks import sync_organizations_v2
from apps.grafana_plugin.tasks.sync_v2 import start_sync_organizations_v2


@pytest.mark.django_db
Expand Down Expand Up @@ -57,7 +57,7 @@ def test_invalid_auth(make_organization_and_user_with_plugin_token, make_user_au
)
@pytest.mark.django_db
def test_skip_org_without_api_token(make_organization, api_token, sync_called):
organization = make_organization(api_token=api_token)
make_organization(api_token=api_token)

with patch(
"apps.grafana_plugin.helpers.GrafanaAPIClient.sync",
Expand All @@ -70,6 +70,9 @@ def test_skip_org_without_api_token(make_organization, api_token, sync_called):
"message": "",
},
),
) as mock_sync:
sync_organizations_v2(org_ids=[organization.id])
assert mock_sync.called == sync_called
):
with patch(
"apps.grafana_plugin.tasks.sync_v2.sync_organizations_v2.apply_async", return_value=None
) as mock_sync:
start_sync_organizations_v2()
assert mock_sync.called == sync_called
1 change: 1 addition & 0 deletions engine/settings/celery_task_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
"apps.grafana_plugin.tasks.sync.start_sync_regions": {"queue": "long"},
"apps.metrics_exporter.tasks.calculate_and_cache_metrics": {"queue": "long"},
"apps.metrics_exporter.tasks.calculate_and_cache_user_was_notified_metric": {"queue": "long"},
"apps.grafana_plugin.tasks.sync_v2.start_sync_organizations_v2": {"queue": "long"},
"apps.grafana_plugin.tasks.sync_v2.sync_organizations_v2": {"queue": "long"},
# SLACK
"apps.integrations.tasks.notify_about_integration_ratelimit_in_slack": {"queue": "slack"},
Expand Down
4 changes: 2 additions & 2 deletions grafana-plugin/pkg/plugin/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (a *App) GetPermissions(settings *OnCallPluginSettings, onCallUser *OnCallU
var permissions []OnCallPermission
err = json.Unmarshal(body, &permissions)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON response: %v", err)
return nil, fmt.Errorf("failed to parse JSON response: %v body=%v", err, body)
}

if res.StatusCode == 200 {
Expand Down Expand Up @@ -88,7 +88,7 @@ func (a *App) GetAllPermissions(settings *OnCallPluginSettings) (map[string]map[
var permissions map[string]map[string]interface{}
err = json.Unmarshal(body, &permissions)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON response: %v", err)
return nil, fmt.Errorf("failed to parse JSON response: %v body=%v", err, body)
}

if res.StatusCode == 200 {
Expand Down
2 changes: 1 addition & 1 deletion grafana-plugin/pkg/plugin/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (a *App) GetOtherPluginSettings(settings *OnCallPluginSettings, pluginID st
var result map[string]interface{}
err = json.Unmarshal(body, &result)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON response: %v", err)
return nil, fmt.Errorf("failed to parse JSON response: %v body=%v", err, body)
}

return result, nil
Expand Down
12 changes: 12 additions & 0 deletions grafana-plugin/pkg/plugin/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ type OnCallStatus struct {
APIURL string `json:"api_url"`
}

func (s *OnCallStatus) AllOk() bool {
return s.PluginConnection.Settings.Ok &&
s.PluginConnection.GrafanaURLFromPlugin.Ok &&
s.PluginConnection.ServiceAccountToken.Ok &&
s.PluginConnection.OnCallAPIURL.Ok &&
s.PluginConnection.OnCallToken.Ok &&
s.PluginConnection.GrafanaURLFromEngine.Ok
}

func (c *OnCallPluginConnection) ValidateOnCallPluginSettings(settings *OnCallPluginSettings) bool {
// TODO: Return all instead of first?
if settings.StackID == 0 {
Expand Down Expand Up @@ -262,4 +271,7 @@ func (a *App) handleStatus(w http.ResponseWriter, req *http.Request) {
}
w.WriteHeader(http.StatusOK)

if status.AllOk() {
a.doSync(req.Context(), false)
}
}
46 changes: 38 additions & 8 deletions grafana-plugin/pkg/plugin/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,24 @@ import (

type OnCallSyncCache struct {
syncMutex sync.Mutex
timer *time.Timer
lastOnCallSync *OnCallSync
start time.Time
}

type SyncCacheAlreadyLocked struct {
Message string
}

func (e *SyncCacheAlreadyLocked) Error() string {
return e.Message
}

func (oc *OnCallSyncCache) UnlockAfterDelay(delay time.Duration) {
oc.timer = time.AfterFunc(delay, func() {
oc.syncMutex.Unlock()
log.DefaultLogger.Info("released OnCallSyncCache lock")
})
}

func (a *App) handleSync(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -53,17 +70,24 @@ func (a *App) handleSync(w http.ResponseWriter, req *http.Request) {
return
}
} else {
go func() {
err := a.makeSyncRequest(req.Context(), forceSend)
if err != nil {
log.DefaultLogger.Error("Error making sync request", "error", err)
}
}()
a.doSync(req.Context(), forceSend)
}

w.WriteHeader(http.StatusOK)
}

func (a *App) doSync(ctx context.Context, forceSend bool) {
go func() {
err := a.makeSyncRequest(ctx, forceSend)
var cacheAlreadyLocked *SyncCacheAlreadyLocked
if errors.As(err, &cacheAlreadyLocked) {
log.DefaultLogger.Info("Skipping sync", "message", err)
} else {
log.DefaultLogger.Error("Error making sync request", "error", err)
}
}()
}

func (a *App) compareSyncData(newOnCallSync *OnCallSync) bool {
if a.lastOnCallSync == nil {
log.DefaultLogger.Info("No saved OnCallSync to compare")
Expand All @@ -80,10 +104,16 @@ func (a *App) makeSyncRequest(ctx context.Context, forceSend bool) error {
}()

locked := a.syncMutex.TryLock()
const duration = 5 * 60 * time.Second
if !locked {
return errors.New("sync already in progress")
elapsed := time.Since(a.start)
remaining := duration - elapsed
msg := fmt.Sprintf("sync already in progress, OnCallSyncCache is locked, remaining time %.0fs", remaining.Seconds())
return &SyncCacheAlreadyLocked{Message: msg}
}
defer a.syncMutex.Unlock()

defer a.UnlockAfterDelay(duration)
a.start = time.Now()

onCallPluginSettings, err := a.OnCallSettingsFromContext(ctx)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions grafana-plugin/pkg/plugin/teams.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (a *App) GetTeamsForUser(settings *OnCallPluginSettings, onCallUser *OnCall
var result []Team
err = json.Unmarshal(body, &result)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON response: %v", err)
return nil, fmt.Errorf("failed to parse JSON response: %v body=%v", err, body)
}

if res.StatusCode == 200 {
Expand Down Expand Up @@ -115,7 +115,7 @@ func (a *App) GetAllTeams(settings *OnCallPluginSettings) ([]OnCallTeam, error)
var result Teams
err = json.Unmarshal(body, &result)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON response: %v", err)
return nil, fmt.Errorf("failed to parse JSON response: %v body=%v", err, body)
}

if res.StatusCode == 200 {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (a *App) GetTeamsMembersForTeam(settings *OnCallPluginSettings, onCallTeam
var result []OrgUser
err = json.Unmarshal(body, &result)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON response: %v", err)
return nil, fmt.Errorf("failed to parse JSON response: %v body=%v", err, body)
}

if res.StatusCode == 200 {
Expand Down
2 changes: 1 addition & 1 deletion grafana-plugin/pkg/plugin/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (a *App) GetAllUsers(settings *OnCallPluginSettings) ([]OnCallUser, error)
var result []OrgUser
err = json.Unmarshal(body, &result)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON response: %v", err)
return nil, fmt.Errorf("failed to parse JSON response: %v body=%v", err, body)
}

if res.StatusCode == 200 {
Expand Down

0 comments on commit 5862053

Please sign in to comment.