Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dsschult committed Oct 21, 2024
1 parent f752eb1 commit 801a185
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 18 deletions.
4 changes: 2 additions & 2 deletions iceprod/scheduled_tasks/job_temp_cleaning.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import asyncio
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
from functools import partial
import logging
import os
Expand Down Expand Up @@ -87,7 +87,7 @@ async def run(rest_client, temp_dir, list_dirs, rmtree, dataset=None, debug=Fals
debug (bool): debug flag to propagate exceptions
"""
suspend_time = timedelta(days=90)
now = datetime.utcnow()
now = datetime.now(UTC)

try:
# get all the job_indexes currently in tmp
Expand Down
4 changes: 2 additions & 2 deletions iceprod/scheduled_tasks/log_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import argparse
import asyncio
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
import logging

from iceprod.client_auth import add_auth_to_argparse, create_rest_client
Expand All @@ -26,7 +26,7 @@ async def run(rest_client, debug=False):
debug (bool): debug flag to propagate exceptions
"""
async def delete_logs(name, days):
time_limit = datetime.utcnow() - timedelta(days=days)
time_limit = datetime.now(UTC) - timedelta(days=days)
args = {
'to': datetime2str(time_limit),
'name': name,
Expand Down
7 changes: 4 additions & 3 deletions iceprod/scheduled_tasks/non_active_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import argparse
import asyncio
from datetime import datetime
from datetime import datetime, UTC
import logging

from iceprod.client_auth import add_auth_to_argparse, create_rest_client
Expand Down Expand Up @@ -64,6 +64,7 @@ async def delete_pilot(pilot_id):

awaitables = set()
reset_pilots = set()
now = datetime.now(UTC)
for dataset_id in dataset_ids:
tasks = dataset_tasks[dataset_id]
if 'processing' in tasks:
Expand All @@ -72,7 +73,7 @@ async def delete_pilot(pilot_id):
args = {'keys': 'status|status_changed'}
task = await rest_client.request('GET', f'/datasets/{dataset_id}/tasks/{task_id}', args)
# check status, and that we haven't just changed status
if task['status'] == 'processing' and (datetime.utcnow()-str2datetime(task['status_changed'])).total_seconds() > 600:
if task['status'] == 'processing' and (now-str2datetime(task['status_changed'])).total_seconds() > 600:
logger.info('dataset %s reset task %s', dataset_id, task_id)
awaitables.add(reset(dataset_id,task_id))

Expand All @@ -82,7 +83,7 @@ async def delete_pilot(pilot_id):
args = {'keys': 'status|status_changed'}
task = await rest_client.request('GET', f'/datasets/{dataset_id}/tasks/{task_id}', args)
# check status, and that we haven't just changed status
if task['status'] in ('reset', 'waiting', 'failed', 'suspended') and (datetime.utcnow()-str2datetime(task['status_changed'])).total_seconds() > 600:
if task['status'] in ('reset', 'waiting', 'failed', 'suspended') and (now-str2datetime(task['status_changed'])).total_seconds() > 600:
reset_pilots.add(task_id)

for p in pilots.values():
Expand Down
1 change: 1 addition & 0 deletions iceprod/server/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ async def task_success(self, task: GridTask, stats: dict | None = None, stdout:
if stderr and stderr.exists():
await self._upload_log(task, 'stderr', stderr.read_text())


'''
@run_on_executor
def _delete_dirs(self, dirs):
Expand Down
2 changes: 0 additions & 2 deletions iceprod/server/plugins/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,7 @@ async def check_iceprod(self):
queue_tasks = {j.task_id: j for j in self.jobs.values()}
server_tasks = await fut
now = datetime.now(UTC)
logger.info(f'server tasks: %r', server_tasks)
for task in server_tasks:
logger.info(f'task {task["dataset_id"]}.{task["task_id"]}')
if task['task_id'] not in queue_tasks:
# ignore anything too recent
if str2datetime(task['status_changed']) >= now - timedelta(minutes=1):
Expand Down
6 changes: 3 additions & 3 deletions iceprod/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ def __init__(self, config_params=None, outfile=None, errfile=None):
logger.error('IceProd Server - version %s', version_string)

async def rotate_logs(self):
current_date = datetime.utcnow()
current_date = datetime.now(UTC)
while self.outfile and self.errfile:
if current_date.day != datetime.utcnow().day:
if current_date.day != datetime.now(UTC).day:
# rotate files
current_date = datetime.utcnow()
current_date = datetime.now(UTC)
if self.outfile:
roll_files(sys.stdout, self.outfile)
if self.errfile:
Expand Down
6 changes: 3 additions & 3 deletions iceprod/website/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""

from collections import defaultdict
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
import functools
import importlib.resources
import logging
Expand Down Expand Up @@ -37,7 +37,7 @@
from iceprod.server import documentation
from iceprod.server.module import FakeStatsClient, StatsClientIgnoreErrors
import iceprod.server.states
from iceprod.server.util import nowstr
from iceprod.server.util import datetime2str, nowstr

logger = logging.getLogger('website')

Expand Down Expand Up @@ -596,7 +596,7 @@ async def post(self):
}
if self.auth_refresh_token:
args['refresh_token'] = self.auth_refresh_token
args['expiration'] = (datetime.utcnow() + timedelta(days=30)).isoformat()
args['expiration'] = datetime2str(datetime.now(UTC) + timedelta(days=30))
await self.cred_rest_client.request('POST', f'/users/{username}/credentials', args)

else:
Expand Down
8 changes: 5 additions & 3 deletions tests/scheduled_tasks/job_temp_cleaning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import logging
from datetime import datetime,timedelta
from datetime import datetime, timedelta, UTC
from unittest.mock import patch, MagicMock, AsyncMock
from concurrent.futures import ThreadPoolExecutor

Expand Down Expand Up @@ -121,10 +121,11 @@ async def client(method, url, args=None):
rmtree.assert_awaited_once_with(path+'/0/1')

# dir with recent suspended job
now = datetime.now(UTC)
jobs['bar'] = {
'job_index': 1,
'status': 'suspended',
'status_changed': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f'),
'status_changed': now.strftime('%Y-%m-%dT%H:%M:%S.%f'),
}
rc.request = AsyncMock(side_effect=client)
listdir = AsyncMock(return_value=data)
Expand All @@ -135,10 +136,11 @@ async def client(method, url, args=None):
rmtree.assert_not_awaited()

# dir with old suspended job
now = datetime.now(UTC)
jobs['bar'] = {
'job_index': 1,
'status': 'suspended',
'status_changed': (datetime.utcnow()-timedelta(days=100)).strftime('%Y-%m-%dT%H:%M:%S'),
'status_changed': (now-timedelta(days=100)).strftime('%Y-%m-%dT%H:%M:%S'),
}
rc.request = AsyncMock(side_effect=client)
listdir = AsyncMock(return_value=data)
Expand Down

0 comments on commit 801a185

Please sign in to comment.