Skip to content

Commit

Permalink
Extend TuringClassificationSelenium to freeze/unfreeze time, and conv…
Browse files Browse the repository at this point in the history
…ert the result to a table
  • Loading branch information
francesco-ballarin committed Aug 31, 2024
1 parent 3f2ec09 commit f2a0ede
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""A selenium browser that connects to a classification page on the current live turing instance."""

import datetime
import typing
import urllib.parse

import bs4
import prettytable
import requests
import selenium.common.exceptions
import selenium.webdriver
Expand All @@ -17,8 +19,9 @@
import selenium.webdriver.remote.webelement
import selenium.webdriver.support.expected_conditions as EC # noqa: N812
import selenium.webdriver.support.ui
import tinycss2

import mathrace_interaction.time
from mathrace_interaction.time import convert_timestamp_to_number_of_seconds


class TuringClassificationSelenium:
Expand Down Expand Up @@ -141,12 +144,12 @@ def timer_above_value(
locator: tuple[str, str]
) -> typing.Callable[[selenium.webdriver.remote.webdriver.WebDriver], bool]:
"""Predicate used in WebDriverWait, inspired by EC.text_to_be_present_in_element."""
value_int = mathrace_interaction.time.convert_timestamp_to_number_of_seconds(value)
value_int = convert_timestamp_to_number_of_seconds(value)

def _predicate(driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool:
try:
element_text = driver.find_element(*locator).text
element_int = mathrace_interaction.time.convert_timestamp_to_number_of_seconds(element_text)
element_int = convert_timestamp_to_number_of_seconds(element_text)
return element_int >= value_int
except selenium.common.exceptions.StaleElementReferenceException: # pragma: no cover
return False
Expand All @@ -159,8 +162,15 @@ def _predicate(driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool:
def _wait_for_classification_computed(self) -> None:
"""Wait for classification computation."""
self.ensure_unlocked()
selenium.webdriver.support.wait.WebDriverWait(self._browser, self._max_wait).until(
JavascriptVariableEvaluatesToTrue("document.updated"))

def document_updated_is_true(driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool:
try:
return driver.execute_script( # type: ignore[no-any-return, no-untyped-call]
"return document.updated;")
except selenium.common.exceptions.StaleElementReferenceException: # pragma: no cover
return False

selenium.webdriver.support.wait.WebDriverWait(self._browser, self._max_wait).until(document_updated_is_true)

def login(self, username: str, password: str) -> None:
"""Log into the turing instance with the provided credentials."""
Expand All @@ -171,7 +181,14 @@ def login(self, username: str, password: str) -> None:
self.find_element(selenium.webdriver.common.by.By.NAME, "password").send_keys(password)
self.find_element(selenium.webdriver.common.by.By.ID, "submit").click()
# Successful login redirects to the home page, where there is a link to change password
self._wait_for_element(selenium.webdriver.common.by.By.CSS_SELECTOR, "a[href='/accounts/password_change/']")
try:
self._wait_for_element(
selenium.webdriver.common.by.By.CSS_SELECTOR, "a[href='/accounts/password_change/']")
except selenium.common.exceptions.TimeoutException:
if "Inserisci nome utente e password corretti" in self.page_source:
raise RuntimeError("Could not login with the provided credentials")
else:
raise

def go_to_classification_page(self, classification_type: str, querystring: dict[str, str]) -> None:
"""Direct the browser to visit a specific classification type."""
Expand Down Expand Up @@ -199,6 +216,68 @@ def ensure_classification_type(self, classification_type: str) -> None:
if not self._browser.current_url.startswith(expected_url):
raise RuntimeError(f"The current page is not a {classification_type} classification")

def get_table(self) -> prettytable.PrettyTable:
"""Get the table representing the unica classification."""
self.ensure_locked()
assert self._locked_page_soup is not None
self.ensure_classification_type("unica")
table = prettytable.PrettyTable()
# Get the headers first
timer_elements = self._locked_page_soup.find_all("h3", id=f"orologio")
assert len(timer_elements) == 1
header1 = ["Position", "Team ID", "Team name", "Score"]
header2 = ["", "", timer_elements[0].text, ""]
num_questions = 0
while True:
question_elements = self._locked_page_soup.find_all("th", id=f"pr-{num_questions + 1}")
if len(question_elements) == 0:
break
else:
assert len(question_elements) == 1
question_text = question_elements[0].text.strip()
if "\n" in question_text: # score for each question goes to a new line
question_number, question_score = question_text.split("\n")
header1.append(question_number)
header2.append(question_score)
else:
header1.append(question_text)
header2.append("")
num_questions += 1
header1.append("Bonus")
header2.append("")
table.field_names = header1
if not all(cell == "" for cell in header2):
table.add_row(header2)
# Get the table content, row by row
team_id = 1
while True:
row = []
position_elements = self._locked_page_soup.find_all("th", id=f"pos-{team_id}")
if len(position_elements) == 0:
break
assert len(position_elements) == 1
row.append(int(position_elements[0].text.strip()[:-1])) # :-1 is to drop the trailing degree symbol
team_id_elements = self._locked_page_soup.find_all("th", id=f"num-{team_id}")
assert len(team_id_elements) == 1
row.append(int(team_id_elements[0].text))
team_name_elements = self._locked_page_soup.find_all("th", id=f"nome-{team_id}")
assert len(team_name_elements) == 1
row.append(team_name_elements[0].text)
team_score_elements = self._locked_page_soup.find_all("th", id=f"punt-{team_id}")
assert len(team_score_elements) == 1
row.append(int(team_score_elements[0].text))
for q in [*range(1, num_questions + 1), "bonus"]:
question_score_elements = self._locked_page_soup.find_all("td", id=f"cell-{team_id}-{q}")
assert len(question_score_elements) == 1
question_score = question_score_elements[0].text.strip()
if question_score != "":
row.append(int(question_score))
else:
row.append("")
table.add_row(row)
team_id += 1
return table

def get_teams_score(self) -> list[int]:
"""Get the score of the teams in the race."""
self.ensure_locked()
Expand Down Expand Up @@ -233,30 +312,55 @@ def get_teams_position(self) -> list[int]:
team_id += 1
return positions

def get_css_sources(self) -> dict[str, str]:
"""Get the content of CSS files used in the current page."""
def get_auxiliary_files(self) -> dict[str, str | bytes]:
"""Get the content of CSS and font files used in the current page."""
self.ensure_locked()
assert self._locked_page_soup is not None

# Get css files first
all_css = dict()
all_css_directory = dict()

for css in self._locked_page_soup.find_all("link", rel="stylesheet"):
# Do not use the current selenium browser to fetch the css content, otherwise
# the browser would move away from the current page. However, since css content
# is static, simply downloading the page via the python package requests suffices.
response = requests.get(urllib.parse.urljoin(self._root_url, css["href"]))
request_url = urllib.parse.urljoin(self._root_url, css["href"])
response = requests.get(request_url)
assert response.status_code == 200
filename = css["href"].split("/")[-1]
assert filename not in all_css, "Cannot have to css files with the same name"
directory, filename = css["href"].rsplit("/", 1)
assert filename not in all_css, "Cannot have two css files with the same name"
all_css[filename] = response.text

return all_css
all_css_directory[filename] = directory

# Next, process each css file to extract the fonts that are required there
all_fonts = dict()

for css_filename in all_css.keys():
rules = tinycss2.parse_stylesheet(all_css[css_filename])
for rule in rules:
if rule.type == 'at-rule': # which define fonts
for token in rule.content:
if token.type == "url":
font_url = token.value
if "?" not in font_url and "#" not in font_url:
request_url = urllib.parse.urljoin(
self._root_url, all_css_directory[css_filename] + "/" + font_url)
response = requests.get(request_url)
assert response.status_code == 200
font_filename = font_url.split("/")[-1]
assert font_filename not in all_fonts, "Cannot have two font files with the same name"
all_fonts[font_filename] = response.content
all_css[css_filename] = all_css[css_filename].replace(font_url, font_filename)

return all_css, all_fonts

def get_cleaned_html_source(self) -> str:
"""
Get a cleaned HTML source code of a page of the turing instance for local download.
The HTML code is preprocessed as follows:
- the path of any css should be flattened to the one returned by get_css_sources.
- the path of any auxiliary file should be flattened to the one returned by get_auxiliary_files.
- any local link to the live instance is removed, since it would not be available locally.
- any javascript is removed, since in order to be visible locally the page cannot contain
any script that requires the live server.
Expand All @@ -283,13 +387,38 @@ def get_cleaned_html_source(self) -> str:
# Return postprocessed page
return str(soup)


class JavascriptVariableEvaluatesToTrue:
"""Helper class used to wait until a javascript variable is true."""

def __init__(self, variable: str) -> None:
self._variable = variable

def __call__(self, driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool:
"""Condition for waiting until the javascript variable is true."""
return driver.execute_script(f"return {self._variable};") # type: ignore[no-any-return, no-untyped-call]
def freeze_time(self, current_time: datetime.datetime) -> None:
"""Freeze the race time at the specified time."""
javascript_timestamp = int(current_time.timestamp() * 1000)
# Overwrite the javascript timer
self._browser.execute_script(f"""\
if (!'timer_backup' in document.client) {{
document.client.timer_backup = document.client.timer.now;
}}
document.client.timer.now = function mock_timer() {{
return {javascript_timestamp};
}}""") # type: ignore[no-untyped-call]
# Force an update of the race time, and wait for the updated event to be triggered
# in order to be sure that the classification has been updated
self._browser.execute_script("""\
document.client.gara.time = document.client.timer.now()
document.updated = false;""") # type: ignore[no-untyped-call]
self._wait_for_classification_computed()

def unfreeze_time(self) -> None:
"""Undo a previous freeze of the race time."""
has_timer_backup = self._browser.execute_script( # type: ignore[no-untyped-call]
"return 'timer_backup' in document.client;""")
if not has_timer_backup:
raise RuntimeError("Did you forget to freeze the time?")
# Restore the javascript timer
self._browser.execute_script("""\
document.client.timer.now = document.client.timer_backup;
delete document.client.timer_backup;""") # type: ignore[no-untyped-call]
# Force an update of the race time, and wait for the updated event to be triggered
# in order to be sure that the classification has been updated
self._browser.execute_script("""\
document.client.gara.time = document.client.timer.now()
document.updated = false;""") # type: ignore[no-untyped-call]
self._wait_for_classification_computed()
4 changes: 3 additions & 1 deletion mathrace_interaction/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ dependencies = [
"bs4",
"jsondiff",
"paramiko",
"prettytable",
"requests",
"selenium"
"selenium",
"tinycss2"
]

[project.urls]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,62 +149,78 @@ def test_classification_browser_get_teams_score_integration_non_default_race_tim


@pytest.mark.parametrize("computation_rate_prefix", ["00:00:0", ""])
def test_classification_browser_get_teams_score_integration_non_default_computation_rate( # type: ignore[no-any-unimported]
def test_classification_browser_get_teams_score_integration_over_time( # type: ignore[no-any-unimported]
live_server: pytest_django.live_server_helper.LiveServer, simple_turing_race: engine.models.Gara,
admin_user: engine.models.User, computation_rate_prefix: str
) -> None:
"""Test team score computation with non default computation."""
"""Test team score computation with non default computation rate and freeze/unfreeze time."""
simple_turing_race.admin = admin_user
simple_turing_race.save()

# Open two browsers
# Open three browsers
browser1 = Browser(live_server, simple_turing_race.pk)
browser1.login(admin_user)
browser2 = Browser(live_server, simple_turing_race.pk)
browser2.login(admin_user)
browser3 = Browser(live_server, simple_turing_race.pk)
browser3.login(admin_user)

# Set race time to a time which is just before the first answer submission
base_querystring = {"race_time": "00:05:28", "ended": "false"}

# Set a computation rate of 1 second in the first browser, and higher than 1 seconds in the second browser.
# Set a computation rate of 1 second in the first browser, and higher than 1 seconds in the other two browsers.
querystring1 = {"computation_rate": computation_rate_prefix + "1"} | base_querystring
querystring2 = {"computation_rate": computation_rate_prefix + "8"} | base_querystring
querystring23 = {"computation_rate": computation_rate_prefix + "8"} | base_querystring
browser1.go_to_classification_page("squadre", querystring1)
browser2.go_to_classification_page("squadre", querystring2)
browser2.go_to_classification_page("squadre", querystring23)
browser3.go_to_classification_page("squadre", querystring23)

# Compute scores before the first answer submission
browser1.lock()
browser2.lock()
browser3.lock()
scores1 = browser1.get_teams_score()
scores2 = browser2.get_teams_score()
scores3 = browser2.get_teams_score()
assert scores1 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70]
assert scores2 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70]
assert scores3 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70]
browser1.unlock()
browser2.unlock()
browser3.unlock()

# Wait until the first browser registers the first answer submission
browser1._wait_for_classification_timer("00:05:31")

# Ensure that the second browser has not registered yet the first answer submission
# Ensure that the other two browsers have not registered yet the first answer submission
timer_to_int = mathrace_interaction.time.convert_timestamp_to_number_of_seconds
assert timer_to_int(
browser2.find_element(selenium.webdriver.common.by.By.ID, "orologio").text) < timer_to_int("00:05:30")
assert timer_to_int(
browser3.find_element(selenium.webdriver.common.by.By.ID, "orologio").text) < timer_to_int("00:05:30")

# Freeze time for the third browser (but not for the second)
browser3.freeze_time()

# Lock scores at the time of the first answer submission.
# Lock page content at the time of the first answer submission.
# Only the first browser will see the difference in the scores.
browser1.lock()
browser2.lock()
browser3.lock()
scores1 = browser1.get_teams_score()
scores2 = browser2.get_teams_score()
scores3 = browser3.get_teams_score()
assert scores1 == [70, 70, 70, 70, 115, 70, 70, 70, 70, 70]
assert scores2 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70]
assert scores3 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70]
browser1.unlock()
browser2.unlock()
browser3.unlock()

# Wait for the second browser to register the first answer submission as well
browser1._wait_for_classification_timer("00:05:40")
browser2._wait_for_classification_timer("00:05:32")

# Lock scores at this final time: both browser will see the updated scores.
# Lock page content at this final time: browser 1 and 2 will see now the same scores.
browser1.lock()
browser2.lock()
scores1 = browser1.get_teams_score()
Expand All @@ -214,6 +230,20 @@ def test_classification_browser_get_teams_score_integration_non_default_computat
browser1.unlock()
browser2.unlock()

# Browser 3 instead will still see the scores before the first submission.
browser3.lock()
scores3 = browser3.get_teams_score()
assert scores3 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70]
browser3.unlock()

# Time needs to be unfrozen for the third browser to see the updated scores.
browser3.unfreeze_time()
browser3._wait_for_classification_timer("00:05:32")
browser3.lock()
scores3 = browser3.get_teams_score()
assert scores3 == [70, 70, 70, 70, 115, 70, 70, 70, 70, 70]
browser3.unlock()


def test_classification_browser_get_teams_position_integration( # type: ignore[no-any-unimported]
live_server: pytest_django.live_server_helper.LiveServer, simple_turing_race: engine.models.Gara,
Expand Down
Loading

0 comments on commit f2a0ede

Please sign in to comment.