Skip to content

Commit

Permalink
Extend TuringClassificationSelenium to freeze/unfreeze time, and conv…
Browse files Browse the repository at this point in the history
…ert the result to a table
  • Loading branch information
francesco-ballarin committed Aug 31, 2024
1 parent 3f2ec09 commit bf6ec5f
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ def _clean_up_turing_dictionary(turing_dict: TuringDict) -> None:

input_file_client: paramiko.SSHClient | None = None
if args.input_file_host != "":
input_file_client = get_ssh_client(args.input_file_host, args.input_file_host_user) # type: ignore[operator]
input_file_client = get_ssh_client(args.input_file_host, args.input_file_host_user)
live_journal_to_live_turing(
lambda: open_file_on_ssh_host(pathlib.Path(args.input_file), input_file_client), # type: ignore[operator]
lambda: open_file_on_ssh_host(pathlib.Path(args.input_file), input_file_client),
engine.models, args.turing_race_id, args.sleep, pathlib.Path(args.output_directory),
lambda time_counter, race_ended: race_ended)
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""A selenium browser that connects to a classification page on the current live turing instance."""

import datetime
import typing
import urllib.parse

import bs4
import prettytable
import requests
import selenium.common.exceptions
import selenium.webdriver
Expand All @@ -17,8 +19,9 @@
import selenium.webdriver.remote.webelement
import selenium.webdriver.support.expected_conditions as EC # noqa: N812
import selenium.webdriver.support.ui
import tinycss2

import mathrace_interaction.time
from mathrace_interaction.time import convert_timestamp_to_number_of_seconds


class TuringClassificationSelenium:
Expand Down Expand Up @@ -55,7 +58,7 @@ class TuringClassificationSelenium:
If unlocked, it contains None.
"""

def __init__(self, root_url: str, race_id: int, max_wait: int) -> None:
def __init__(self, root_url: str, race_id: int, max_wait: float) -> None:
options = selenium.webdriver.ChromeOptions()
options.add_argument("--no-sandbox") # type: ignore[no-untyped-call]
options.add_argument("--window-size=1920,1080") # type: ignore[no-untyped-call]
Expand Down Expand Up @@ -141,12 +144,12 @@ def timer_above_value(
locator: tuple[str, str]
) -> typing.Callable[[selenium.webdriver.remote.webdriver.WebDriver], bool]:
"""Predicate used in WebDriverWait, inspired by EC.text_to_be_present_in_element."""
value_int = mathrace_interaction.time.convert_timestamp_to_number_of_seconds(value)
value_int = convert_timestamp_to_number_of_seconds(value)

def _predicate(driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool:
try:
element_text = driver.find_element(*locator).text
element_int = mathrace_interaction.time.convert_timestamp_to_number_of_seconds(element_text)
element_int = convert_timestamp_to_number_of_seconds(element_text)
return element_int >= value_int
except selenium.common.exceptions.StaleElementReferenceException: # pragma: no cover
return False
Expand All @@ -159,8 +162,15 @@ def _predicate(driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool:
def _wait_for_classification_computed(self) -> None:
"""Wait for classification computation."""
self.ensure_unlocked()
selenium.webdriver.support.wait.WebDriverWait(self._browser, self._max_wait).until(
JavascriptVariableEvaluatesToTrue("document.updated"))

def document_updated_is_true(driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool:
try:
return driver.execute_script( # type: ignore[no-any-return, no-untyped-call]
"return document.updated;")
except selenium.common.exceptions.StaleElementReferenceException: # pragma: no cover
return False

selenium.webdriver.support.wait.WebDriverWait(self._browser, self._max_wait).until(document_updated_is_true)

def login(self, username: str, password: str) -> None:
"""Log into the turing instance with the provided credentials."""
Expand All @@ -171,7 +181,14 @@ def login(self, username: str, password: str) -> None:
self.find_element(selenium.webdriver.common.by.By.NAME, "password").send_keys(password)
self.find_element(selenium.webdriver.common.by.By.ID, "submit").click()
# Successful login redirects to the home page, where there is a link to change password
self._wait_for_element(selenium.webdriver.common.by.By.CSS_SELECTOR, "a[href='/accounts/password_change/']")
try:
self._wait_for_element(
selenium.webdriver.common.by.By.CSS_SELECTOR, "a[href='/accounts/password_change/']")
except selenium.common.exceptions.TimeoutException:
if "Inserisci nome utente e password corretti" in self.page_source:
raise RuntimeError("Could not login with the provided credentials")
else: # pragma: no cover
raise

def go_to_classification_page(self, classification_type: str, querystring: dict[str, str]) -> None:
"""Direct the browser to visit a specific classification type."""
Expand Down Expand Up @@ -199,6 +216,68 @@ def ensure_classification_type(self, classification_type: str) -> None:
if not self._browser.current_url.startswith(expected_url):
raise RuntimeError(f"The current page is not a {classification_type} classification")

def get_table(self) -> prettytable.PrettyTable:
"""Get the table representing the unica classification."""
self.ensure_locked()
assert self._locked_page_soup is not None
self.ensure_classification_type("unica")
table = prettytable.PrettyTable()
# Get the headers first
timer_elements = self._locked_page_soup.find_all("h3", id="orologio")
assert len(timer_elements) == 1
header1 = ["Position", "Team ID", "Team name", "Score"]
header2 = ["", "", timer_elements[0].text, ""]
num_questions = 0
while True:
question_elements = self._locked_page_soup.find_all("th", id=f"pr-{num_questions + 1}")
if len(question_elements) == 0:
break
else:
assert len(question_elements) == 1
question_text = question_elements[0].text.strip()
if "\n" in question_text: # score for each question goes to a new line
question_number, question_score = question_text.split("\n")
header1.append(question_number.strip())
header2.append(question_score.strip())
else:
header1.append(question_text)
header2.append("")
num_questions += 1
header1.append("Bonus")
header2.append("")
table.field_names = header1
if not all(cell == "" for cell in header2):
table.add_row(header2)
# Get the table content, row by row
team_id = 1
while True:
row: list[int | str] = []
position_elements = self._locked_page_soup.find_all("th", id=f"pos-{team_id}")
if len(position_elements) == 0:
break
assert len(position_elements) == 1
row.append(int(position_elements[0].text.strip()[:-1])) # :-1 is to drop the trailing degree symbol
team_id_elements = self._locked_page_soup.find_all("th", id=f"num-{team_id}")
assert len(team_id_elements) == 1
row.append(int(team_id_elements[0].text))
team_name_elements = self._locked_page_soup.find_all("th", id=f"nome-{team_id}")
assert len(team_name_elements) == 1
row.append(team_name_elements[0].text)
team_score_elements = self._locked_page_soup.find_all("th", id=f"punt-{team_id}")
assert len(team_score_elements) == 1
row.append(int(team_score_elements[0].text))
for q in [*range(1, num_questions + 1), "bonus"]:
question_score_elements = self._locked_page_soup.find_all("td", id=f"cell-{team_id}-{q}")
assert len(question_score_elements) == 1
question_score = question_score_elements[0].text.strip()
if question_score != "":
row.append(int(question_score))
else:
row.append("")
table.add_row(row)
team_id += 1
return table

def get_teams_score(self) -> list[int]:
"""Get the score of the teams in the race."""
self.ensure_locked()
Expand Down Expand Up @@ -233,30 +312,55 @@ def get_teams_position(self) -> list[int]:
team_id += 1
return positions

def get_css_sources(self) -> dict[str, str]:
"""Get the content of CSS files used in the current page."""
def get_auxiliary_files(self) -> tuple[dict[str, str], dict[str, bytes]]:
"""Get the content of CSS and font files used in the current page."""
self.ensure_locked()
assert self._locked_page_soup is not None

# Get css files first
all_css = dict()
all_css_directory = dict()

for css in self._locked_page_soup.find_all("link", rel="stylesheet"):
# Do not use the current selenium browser to fetch the css content, otherwise
# the browser would move away from the current page. However, since css content
# is static, simply downloading the page via the python package requests suffices.
response = requests.get(urllib.parse.urljoin(self._root_url, css["href"]))
request_url = urllib.parse.urljoin(self._root_url, css["href"])
response = requests.get(request_url)
assert response.status_code == 200
filename = css["href"].split("/")[-1]
assert filename not in all_css, "Cannot have to css files with the same name"
directory, filename = css["href"].rsplit("/", 1)
assert filename not in all_css, "Cannot have two css files with the same name"
all_css[filename] = response.text

return all_css
all_css_directory[filename] = directory

# Next, process each css file to extract the fonts that are required there
all_fonts = dict()

for css_filename in all_css.keys():
rules = tinycss2.parse_stylesheet(all_css[css_filename])
for rule in rules:
if rule.type == "at-rule": # which define fonts
for token in rule.content:
if token.type == "url":
font_url = token.value
if "?" not in font_url and "#" not in font_url:
request_url = urllib.parse.urljoin(
self._root_url, all_css_directory[css_filename] + "/" + font_url)
response = requests.get(request_url)
assert response.status_code == 200
font_filename = font_url.split("/")[-1]
assert font_filename not in all_fonts, "Cannot have two font files with the same name"
all_fonts[font_filename] = response.content
all_css[css_filename] = all_css[css_filename].replace(font_url, font_filename)

return all_css, all_fonts

def get_cleaned_html_source(self) -> str:
"""
Get a cleaned HTML source code of a page of the turing instance for local download.
The HTML code is preprocessed as follows:
- the path of any css should be flattened to the one returned by get_css_sources.
- the path of any auxiliary file should be flattened to the one returned by get_auxiliary_files.
- any local link to the live instance is removed, since it would not be available locally.
- any javascript is removed, since in order to be visible locally the page cannot contain
any script that requires the live server.
Expand All @@ -283,13 +387,40 @@ def get_cleaned_html_source(self) -> str:
# Return postprocessed page
return str(soup)

def freeze_time(self, current_time: datetime.datetime, force_classification_update: bool = True) -> None:
"""Freeze the race time at the specified time."""
javascript_timestamp = int(current_time.timestamp() * 1000)
# Overwrite the javascript timer
self._browser.execute_script(f"""\
if (!('timer_backup' in document.client)) {{
document.client.timer_backup = document.client.timer.now;
}}
document.client.timer.now = function mock_timer() {{
return {javascript_timestamp};
}}""") # type: ignore[no-untyped-call]
if force_classification_update:
# Force an update of the race time, and wait for the updated event to be triggered
# in order to be sure that the classification has been updated
self._browser.execute_script("""\
document.client.gara.time = document.client.timer.now()
document.updated = false;""") # type: ignore[no-untyped-call]
self._wait_for_classification_computed()

class JavascriptVariableEvaluatesToTrue:
"""Helper class used to wait until a javascript variable is true."""

def __init__(self, variable: str) -> None:
self._variable = variable

def __call__(self, driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool:
"""Condition for waiting until the javascript variable is true."""
return driver.execute_script(f"return {self._variable};") # type: ignore[no-any-return, no-untyped-call]
def unfreeze_time(self, force_classification_update: bool = True) -> None:
"""Undo a previous freeze of the race time."""
has_timer_backup = self._browser.execute_script( # type: ignore[no-untyped-call]
"return 'timer_backup' in document.client;""")
if not has_timer_backup:
raise RuntimeError("Did you forget to freeze the time?")
# Restore the javascript timer
self._browser.execute_script("""\
document.client.timer.now = document.client.timer_backup;
delete document.client.timer_backup;""") # type: ignore[no-untyped-call]
if force_classification_update:
# Force an update of the race time, and wait for the updated event to be triggered
# in order to be sure that the classification has been updated
self._browser.execute_script("""\
document.client.gara.time = document.client.timer.now()
document.updated = false;""") # type: ignore[no-untyped-call]
self._wait_for_classification_computed()
5 changes: 4 additions & 1 deletion mathrace_interaction/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ dependencies = [
"bs4",
"jsondiff",
"paramiko",
"prettytable",
"requests",
"selenium"
"selenium",
"tinycss2"
]

[project.urls]
Expand Down Expand Up @@ -91,6 +93,7 @@ module = [
"engine.models",
"jsondiff",
"simpleeval",
"tinycss2",
"Turing"
]
ignore_missing_imports = true
Expand Down
Loading

0 comments on commit bf6ec5f

Please sign in to comment.