-
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add live_turing_to_html script (although without unit and integration…
… testing)
- Loading branch information
1 parent
bf6ec5f
commit 7131bbc
Showing
5 changed files
with
332 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
278 changes: 278 additions & 0 deletions
278
mathrace_interaction/mathrace_interaction/live_turing_to_html.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,278 @@ | ||
# Copyright (C) 2024 by the Turing @ DMF authors | ||
# | ||
# This file is part of Turing @ DMF. | ||
# | ||
# SPDX-License-Identifier: AGPL-3.0-or-later | ||
"""Follow a live session in turing and convert it into a sequence of html files.""" | ||
|
||
import argparse | ||
import datetime | ||
import json | ||
import pathlib | ||
import shutil | ||
import time | ||
import types | ||
import typing | ||
|
||
import prettytable | ||
import pytz | ||
|
||
from mathrace_interaction.network import TuringClassificationSelenium | ||
|
||
|
||
def live_turing_to_html( | ||
turing_url: str, turing_models: types.ModuleType, turing_race_id: int, turing_race_admin_password: str, | ||
sleep: float, output_directory: pathlib.Path, compute_current_time: typing.Callable[[int], datetime.datetime], | ||
termination_condition: typing.Callable[[int], bool] | ||
) -> None: # pragma: no cover | ||
""" | ||
Follow a live session in turing and convert it into a sequence of html files. | ||
Parameters | ||
---------- | ||
turing_url | ||
The URL of the live turing instance. | ||
turing_models | ||
The python module containing the turing model Gara. | ||
turing_race_id | ||
The ID of the turing race to follow. | ||
turing_race_admin_password | ||
The password of the administrator of the turing race. | ||
sleep | ||
The amount of time to wait between consecutive reads of the turing state. | ||
output_directory | ||
The path of the output directory | ||
compute_current_time | ||
A function that computes the current time given the current time counter | ||
termination_condition | ||
A function to determine whether to terminate the processing given the current time counter. | ||
""" | ||
# Get the actual turing models out of the turing_models argument | ||
Gara = getattr(turing_models, "Gara") # noqa: N806 | ||
|
||
# Get the turing race from its ID | ||
turing_race = Gara.objects.get(pk=turing_race_id) | ||
|
||
# The race must have been started before running this script | ||
if turing_race.inizio is None: | ||
raise RuntimeError(f"Please start race {turing_race_id} from the turing web interface") | ||
|
||
# Create the output directory if it does not exist yet | ||
output_directory.mkdir(parents=True, exist_ok=True) | ||
|
||
# Constants associated to the two browsers we will open | ||
LIVE = 0 # noqa: N806 | ||
COMPARISON = 1 # noqa: N806 | ||
|
||
# Constants associated to the table we will create | ||
POSITION_COLUMN = 0 # noqa: N806 | ||
TEAM_ID_COLUMN = 1 # noqa: N806 | ||
TEAM_NAME_COLUMN = 2 # noqa: N806 | ||
SCORE_COLUMN = 3 # noqa: N806 | ||
|
||
# Create subdirectories in the output directory, if they do not exist yet | ||
datetime_files_directory = output_directory / "datetime_files" | ||
live_turing_json_files_directory = output_directory / "live_turing_json_files" | ||
html_files_directory = [output_directory / "html_files", output_directory / "html_files_comparison"] | ||
table_files_directory = [output_directory / "table_files", output_directory / "table_files_comparison"] | ||
podium_change_files_directory = output_directory / "podium_change_files" | ||
for directory in ( | ||
datetime_files_directory, live_turing_json_files_directory, *html_files_directory, *table_files_directory, | ||
podium_change_files_directory | ||
): | ||
directory.mkdir(parents=True, exist_ok=True) | ||
|
||
# Read the current time counter if available, otherwise set it to zero | ||
time_counter = 0 | ||
time_counter_file = output_directory / "time_counter.txt" | ||
if time_counter_file.exists(): | ||
time_counter = int(time_counter_file.read_text()) | ||
time_counter += 1 | ||
|
||
# Open two browsers to access the classification with querystring ?ended=False and ?ended=True | ||
browsers = [TuringClassificationSelenium(turing_url, turing_race.pk, sleep // 2) for _ in range(2)] | ||
for browser in browsers: | ||
browser.login(turing_race.admin.username, turing_race_admin_password) | ||
# Connect the live browser to the live instance with ?ended=False | ||
browsers[LIVE].go_to_classification_page("unica", {"ended": "false", "computation_rate": str(sleep // 4)}) | ||
|
||
# Save CSS files for HTML export | ||
if time_counter == 0: | ||
browsers[LIVE].lock() | ||
css_files, font_files = browsers[LIVE].get_auxiliary_files() | ||
for (auxiliary_files, write_content) in ( | ||
(css_files, lambda path, content: path.write_text(content)), | ||
(font_files, lambda path, content: path.write_bytes(content)) | ||
): | ||
for (filename, content) in auxiliary_files.items(): | ||
write_content(html_files_directory[LIVE] / filename, content) # type: ignore[no-untyped-call] | ||
shutil.copy( | ||
html_files_directory[LIVE] / filename, | ||
html_files_directory[COMPARISON] / filename) | ||
browsers[LIVE].unlock() | ||
|
||
# Continuously read the turing state | ||
previous_positions = None | ||
previous_scores = None | ||
while True: | ||
print(f"{time_counter=}") | ||
# Compute the current time, up to the microsecond | ||
current_time = compute_current_time(time_counter) | ||
actual_time = datetime.datetime.now(current_time.tzinfo) | ||
inizio = turing_race.inizio.astimezone(current_time.tzinfo) | ||
timestamp = (current_time - inizio).total_seconds() | ||
# Precision to the second is more then enough for our goals: strip the microseconds, | ||
# and recompute the dates | ||
timestamp = int(timestamp) | ||
current_time = inizio + datetime.timedelta(seconds=timestamp) | ||
# Write out current time | ||
(datetime_files_directory / f"{time_counter}.datetime").write_text("""Computed: {current_time} | ||
Actual: {actual_time}""") | ||
print(f"\tcomputed time is {current_time}") | ||
print(f"\tactual time is {actual_time}") | ||
print(f"\telapsed number of seconds {timestamp}") | ||
# Backup the turing dictionary associated to the race at the time represented by the current counter | ||
with open(live_turing_json_files_directory / f"{time_counter}.json", "w") as turing_json_file: | ||
turing_json_file.write(json.dumps(turing_race.to_dict(), indent=4)) | ||
shutil.copy( | ||
live_turing_json_files_directory / f"{time_counter}.json", | ||
live_turing_json_files_directory / "latest.json") | ||
# Download browser content | ||
html: list[str] = [None, None] # type: ignore[list-item] | ||
table: list[prettytable.PrettyTable] = [None, None] # type: ignore[list-item] | ||
for (INSTANCE, browser) in enumerate(browsers): # noqa: N806 | ||
print(f'\tupdating {"live" if INSTANCE == LIVE else "comparison"} browser') | ||
if INSTANCE == LIVE: | ||
# Freeze the browser at the current time | ||
browser.freeze_time(current_time) | ||
else: | ||
# Time does not get updated in the comparison browser, and hence go to the updated classification page | ||
browser.go_to_classification_page("unica", { | ||
"ended": "true", "computation_rate": "1", "race_time": str(timestamp)}) | ||
# Save the content of the browser | ||
browser.lock() | ||
html[INSTANCE] = browser.get_cleaned_html_source() | ||
table[INSTANCE] = browser.get_table() | ||
browser.unlock() | ||
# Do not bother unfreezing time in the live browser, since it would immediately be frozen again | ||
# at the next iteration | ||
# Write out the html files | ||
for INSTANCE in (LIVE, COMPARISON): # noqa: N806 | ||
assert html[INSTANCE] is not None | ||
(html_files_directory[INSTANCE] / f"{time_counter}.html").write_text( | ||
html[INSTANCE]) | ||
# Add livejs script to the latest page so that it refreshes automatically | ||
# when uploaded to an HTTP server. | ||
# Note: livejs will not work when opening the file locally, since the file:// is not supported: | ||
# to try it you need to have a real server and access it through http:// or https:// | ||
# As a workaround, you can start a local HTTP server by running | ||
# python3 -m http.server | ||
# in the local directory. | ||
(html_files_directory[INSTANCE] / "latest.html").write_text( | ||
(html_files_directory[INSTANCE] / f"{time_counter}.html").read_text().replace( | ||
"</head>", '<script src="https://livejs.com/live.js"></script></head>')) | ||
with open(html_files_directory[INSTANCE] / "watch.txt", "a") as text_file: | ||
text_file.write(f"updated at time counter {time_counter} ({current_time})\n") | ||
# Determine if the live table and the comparison one are the same or not | ||
assert table[LIVE] is not None | ||
warn_table = (table[LIVE].get_string() != table[COMPARISON].get_string()) | ||
# Compute team positions/scores, as a dictionary from the team ID to the team position/score | ||
positions = {r[TEAM_ID_COLUMN]: r[POSITION_COLUMN] for r in table[LIVE].rows[1:]} | ||
scores = {r[TEAM_ID_COLUMN]: r[SCORE_COLUMN] for r in table[LIVE].rows[1:]} | ||
# Compute the difference between the scores at this time and at the previous time | ||
print_fields: list[list[str]] = [None, None] # type: ignore[list-item] | ||
print_fields[LIVE] = ["Position", "Team ID", "Team name", "Score"] | ||
print_fields[COMPARISON] = ["Position", "Team ID", "Team name", "Score"] # do not assign the LIVE one! | ||
podium_change: list[tuple[int, str, int, int]] = [] | ||
if previous_positions is not None: | ||
position_update = [ | ||
previous_positions[r[TEAM_ID_COLUMN]] - positions[r[TEAM_ID_COLUMN]] for r in table[LIVE].rows[1:]] | ||
for podium_position in (3, 2, 1): | ||
if position_update[podium_position - 1] > 0: | ||
team_id = table[LIVE].rows[podium_position][TEAM_ID_COLUMN] | ||
team_name = table[LIVE].rows[podium_position][TEAM_NAME_COLUMN] | ||
podium_change.append((team_id, team_name, podium_position, previous_positions[team_id])) | ||
table[LIVE].add_column("Position update", [""] + [u if u != 0 else "" for u in position_update]) | ||
print_fields[LIVE].append("Position update") | ||
if previous_scores is not None: | ||
score_update = [ | ||
scores[r[TEAM_ID_COLUMN]] - previous_scores[r[TEAM_ID_COLUMN]] for r in table[LIVE].rows[1:]] | ||
table[LIVE].add_column("Score update", [""] + [u if u != 0 else "" for u in score_update]) | ||
print_fields[LIVE].append("Score update") | ||
# Write out the table files | ||
for INSTANCE in (LIVE, COMPARISON): # noqa: N806 | ||
assert table[INSTANCE] is not None | ||
(table_files_directory[INSTANCE] / f"{time_counter}.csv").write_text( | ||
table[INSTANCE].get_formatted_string(out_format="csv")) | ||
(table_files_directory[INSTANCE] / f"{time_counter}.html").write_text( | ||
"<html><head></head><body>" | ||
+ table[INSTANCE].get_formatted_string(fields=print_fields[INSTANCE], out_format="html", format=True) | ||
+ "</body>") | ||
shutil.copy( | ||
table_files_directory[INSTANCE] / f"{time_counter}.csv", | ||
table_files_directory[INSTANCE] / "latest.csv") | ||
(table_files_directory[INSTANCE] / "latest.html").write_text( | ||
(table_files_directory[INSTANCE] / f"{time_counter}.html").read_text().replace( | ||
"</head>", '<script src="https://livejs.com/live.js"></script></head>')) | ||
# Write out the podium change file | ||
for (text_filename, mode) in ((f"{time_counter}.txt", "w"), ("watch.txt", "a")): | ||
with open(podium_change_files_directory / text_filename, mode) as text_file: | ||
for (team_id, team_name, current_position, previous_position) in podium_change: | ||
text_file.write( | ||
f"position change at time counter {time_counter} ({current_time}): " | ||
f'team with ID {team_id} and name "{team_name}" improves from ' | ||
f"position {previous_position} to podium position {current_position}\n" | ||
) | ||
# Write out the time counter | ||
time_counter_file.write_text(str(time_counter)) | ||
# Print out table | ||
print("\t" + table[LIVE].get_string(fields=print_fields[LIVE]).replace("\n", "\n\t")) | ||
if warn_table: | ||
print("\tWARNING: live and comparison tables are different") | ||
# Break out of the loop if the race has ended | ||
if termination_condition(time_counter): | ||
break | ||
# Upate the time counter | ||
time_counter += 1 | ||
# Replace previous positions/scores | ||
previous_positions = positions | ||
previous_scores = scores | ||
# Wait before reading again the updated version of the turing state | ||
actual_time_end = datetime.datetime.now(current_time.tzinfo) | ||
wait_time = sleep - (actual_time_end - actual_time).total_seconds() | ||
if wait_time > 0: | ||
print(f"\twaiting {wait_time} seconds for next time iteration") | ||
time.sleep(wait_time) | ||
|
||
|
||
if __name__ == "__main__": # pragma: no cover | ||
# This import requires turing to be available, and thus cannot be moved to the common section. | ||
# We skip coverage testing of this part because we cannot cover this in unit tests, since they | ||
# cannot interact with turing. Testing this entrypoint is delayed to integration testing. | ||
import django | ||
django.setup() | ||
|
||
import django.conf | ||
import engine.models | ||
|
||
TIME_ZONE_SETTING = getattr(django.conf.settings, "TIME_ZONE", None) | ||
assert TIME_ZONE_SETTING is not None | ||
assert isinstance(TIME_ZONE_SETTING, str) | ||
TIME_ZONE_SETTING = pytz.timezone(TIME_ZONE_SETTING) | ||
|
||
parser = argparse.ArgumentParser(add_help=False) | ||
parser.add_argument("-u", "--turing-url", type=str, required=True, help="The URL of the live turing instance") | ||
parser.add_argument("-t", "--turing-race-id", type=int, required=True, help="ID of the turing race to follow") | ||
parser.add_argument( | ||
"-p", "--turing-race-admin-password", type=str, required=True, | ||
help="The password of the administrator of the turing race") | ||
parser.add_argument( | ||
"-s", "--sleep", type=float, required=False, default=1.0, | ||
help="The amount of time to wait between consecutive turing race exports") | ||
parser.add_argument("-o", "--output-directory", type=str, required=True, help="Path of the output directory") | ||
args = parser.parse_args() | ||
|
||
live_turing_to_html( | ||
args.turing_url, engine.models, args.turing_race_id, args.turing_race_admin_password, | ||
args.sleep, pathlib.Path(args.output_directory), lambda time_counter: datetime.datetime.now(TIME_ZONE_SETTING), | ||
lambda time_counter: False) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters