diff --git a/mathrace_interaction/mathrace_interaction/network/turing_classification_selenium.py b/mathrace_interaction/mathrace_interaction/network/turing_classification_selenium.py index a68095e..32c6572 100644 --- a/mathrace_interaction/mathrace_interaction/network/turing_classification_selenium.py +++ b/mathrace_interaction/mathrace_interaction/network/turing_classification_selenium.py @@ -5,10 +5,12 @@ # SPDX-License-Identifier: AGPL-3.0-or-later """A selenium browser that connects to a classification page on the current live turing instance.""" +import datetime import typing import urllib.parse import bs4 +import prettytable import requests import selenium.common.exceptions import selenium.webdriver @@ -17,8 +19,9 @@ import selenium.webdriver.remote.webelement import selenium.webdriver.support.expected_conditions as EC # noqa: N812 import selenium.webdriver.support.ui +import tinycss2 -import mathrace_interaction.time +from mathrace_interaction.time import convert_timestamp_to_number_of_seconds class TuringClassificationSelenium: @@ -141,12 +144,12 @@ def timer_above_value( locator: tuple[str, str] ) -> typing.Callable[[selenium.webdriver.remote.webdriver.WebDriver], bool]: """Predicate used in WebDriverWait, inspired by EC.text_to_be_present_in_element.""" - value_int = mathrace_interaction.time.convert_timestamp_to_number_of_seconds(value) + value_int = convert_timestamp_to_number_of_seconds(value) def _predicate(driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool: try: element_text = driver.find_element(*locator).text - element_int = mathrace_interaction.time.convert_timestamp_to_number_of_seconds(element_text) + element_int = convert_timestamp_to_number_of_seconds(element_text) return element_int >= value_int except selenium.common.exceptions.StaleElementReferenceException: # pragma: no cover return False @@ -159,8 +162,15 @@ def _predicate(driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool: def _wait_for_classification_computed(self) -> None: """Wait for classification computation.""" self.ensure_unlocked() - selenium.webdriver.support.wait.WebDriverWait(self._browser, self._max_wait).until( - JavascriptVariableEvaluatesToTrue("document.updated")) + + def document_updated_is_true(driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool: + try: + return driver.execute_script( # type: ignore[no-any-return, no-untyped-call] + "return document.updated;") + except selenium.common.exceptions.StaleElementReferenceException: # pragma: no cover + return False + + selenium.webdriver.support.wait.WebDriverWait(self._browser, self._max_wait).until(document_updated_is_true) def login(self, username: str, password: str) -> None: """Log into the turing instance with the provided credentials.""" @@ -171,7 +181,14 @@ def login(self, username: str, password: str) -> None: self.find_element(selenium.webdriver.common.by.By.NAME, "password").send_keys(password) self.find_element(selenium.webdriver.common.by.By.ID, "submit").click() # Successful login redirects to the home page, where there is a link to change password - self._wait_for_element(selenium.webdriver.common.by.By.CSS_SELECTOR, "a[href='/accounts/password_change/']") + try: + self._wait_for_element( + selenium.webdriver.common.by.By.CSS_SELECTOR, "a[href='/accounts/password_change/']") + except selenium.common.exceptions.TimeoutException: + if "Inserisci nome utente e password corretti" in self.page_source: + raise RuntimeError("Could not login with the provided credentials") + else: + raise def go_to_classification_page(self, classification_type: str, querystring: dict[str, str]) -> None: """Direct the browser to visit a specific classification type.""" @@ -199,6 +216,68 @@ def ensure_classification_type(self, classification_type: str) -> None: if not self._browser.current_url.startswith(expected_url): raise RuntimeError(f"The current page is not a {classification_type} classification") + def get_table(self) -> prettytable.PrettyTable: + """Get the table representing the unica classification.""" + self.ensure_locked() + assert self._locked_page_soup is not None + self.ensure_classification_type("unica") + table = prettytable.PrettyTable() + # Get the headers first + timer_elements = self._locked_page_soup.find_all("h3", id=f"orologio") + assert len(timer_elements) == 1 + header1 = ["Position", "Team ID", "Team name", "Score"] + header2 = ["", "", timer_elements[0].text, ""] + num_questions = 0 + while True: + question_elements = self._locked_page_soup.find_all("th", id=f"pr-{num_questions + 1}") + if len(question_elements) == 0: + break + else: + assert len(question_elements) == 1 + question_text = question_elements[0].text.strip() + if "\n" in question_text: # score for each question goes to a new line + question_number, question_score = question_text.split("\n") + header1.append(question_number) + header2.append(question_score) + else: + header1.append(question_text) + header2.append("") + num_questions += 1 + header1.append("Bonus") + header2.append("") + table.field_names = header1 + if not all(cell == "" for cell in header2): + table.add_row(header2) + # Get the table content, row by row + team_id = 1 + while True: + row = [] + position_elements = self._locked_page_soup.find_all("th", id=f"pos-{team_id}") + if len(position_elements) == 0: + break + assert len(position_elements) == 1 + row.append(int(position_elements[0].text.strip()[:-1])) # :-1 is to drop the trailing degree symbol + team_id_elements = self._locked_page_soup.find_all("th", id=f"num-{team_id}") + assert len(team_id_elements) == 1 + row.append(int(team_id_elements[0].text)) + team_name_elements = self._locked_page_soup.find_all("th", id=f"nome-{team_id}") + assert len(team_name_elements) == 1 + row.append(team_name_elements[0].text) + team_score_elements = self._locked_page_soup.find_all("th", id=f"punt-{team_id}") + assert len(team_score_elements) == 1 + row.append(int(team_score_elements[0].text)) + for q in [*range(1, num_questions + 1), "bonus"]: + question_score_elements = self._locked_page_soup.find_all("td", id=f"cell-{team_id}-{q}") + assert len(question_score_elements) == 1 + question_score = question_score_elements[0].text.strip() + if question_score != "": + row.append(int(question_score)) + else: + row.append("") + table.add_row(row) + team_id += 1 + return table + def get_teams_score(self) -> list[int]: """Get the score of the teams in the race.""" self.ensure_locked() @@ -233,30 +312,55 @@ def get_teams_position(self) -> list[int]: team_id += 1 return positions - def get_css_sources(self) -> dict[str, str]: - """Get the content of CSS files used in the current page.""" + def get_auxiliary_files(self) -> dict[str, str | bytes]: + """Get the content of CSS and font files used in the current page.""" self.ensure_locked() assert self._locked_page_soup is not None + + # Get css files first all_css = dict() + all_css_directory = dict() for css in self._locked_page_soup.find_all("link", rel="stylesheet"): # Do not use the current selenium browser to fetch the css content, otherwise # the browser would move away from the current page. However, since css content # is static, simply downloading the page via the python package requests suffices. - response = requests.get(urllib.parse.urljoin(self._root_url, css["href"])) + request_url = urllib.parse.urljoin(self._root_url, css["href"]) + response = requests.get(request_url) assert response.status_code == 200 - filename = css["href"].split("/")[-1] - assert filename not in all_css, "Cannot have to css files with the same name" + directory, filename = css["href"].rsplit("/", 1) + assert filename not in all_css, "Cannot have two css files with the same name" all_css[filename] = response.text - - return all_css + all_css_directory[filename] = directory + + # Next, process each css file to extract the fonts that are required there + all_fonts = dict() + + for css_filename in all_css.keys(): + rules = tinycss2.parse_stylesheet(all_css[css_filename]) + for rule in rules: + if rule.type == 'at-rule': # which define fonts + for token in rule.content: + if token.type == "url": + font_url = token.value + if "?" not in font_url and "#" not in font_url: + request_url = urllib.parse.urljoin( + self._root_url, all_css_directory[css_filename] + "/" + font_url) + response = requests.get(request_url) + assert response.status_code == 200 + font_filename = font_url.split("/")[-1] + assert font_filename not in all_fonts, "Cannot have two font files with the same name" + all_fonts[font_filename] = response.content + all_css[css_filename] = all_css[css_filename].replace(font_url, font_filename) + + return all_css, all_fonts def get_cleaned_html_source(self) -> str: """ Get a cleaned HTML source code of a page of the turing instance for local download. The HTML code is preprocessed as follows: - - the path of any css should be flattened to the one returned by get_css_sources. + - the path of any auxiliary file should be flattened to the one returned by get_auxiliary_files. - any local link to the live instance is removed, since it would not be available locally. - any javascript is removed, since in order to be visible locally the page cannot contain any script that requires the live server. @@ -283,13 +387,38 @@ def get_cleaned_html_source(self) -> str: # Return postprocessed page return str(soup) - -class JavascriptVariableEvaluatesToTrue: - """Helper class used to wait until a javascript variable is true.""" - - def __init__(self, variable: str) -> None: - self._variable = variable - - def __call__(self, driver: selenium.webdriver.remote.webdriver.WebDriver) -> bool: - """Condition for waiting until the javascript variable is true.""" - return driver.execute_script(f"return {self._variable};") # type: ignore[no-any-return, no-untyped-call] + def freeze_time(self, current_time: datetime.datetime) -> None: + """Freeze the race time at the specified time.""" + javascript_timestamp = int(current_time.timestamp() * 1000) + # Overwrite the javascript timer + self._browser.execute_script(f"""\ +if (!'timer_backup' in document.client) {{ + document.client.timer_backup = document.client.timer.now; +}} + +document.client.timer.now = function mock_timer() {{ + return {javascript_timestamp}; +}}""") # type: ignore[no-untyped-call] + # Force an update of the race time, and wait for the updated event to be triggered + # in order to be sure that the classification has been updated + self._browser.execute_script("""\ +document.client.gara.time = document.client.timer.now() +document.updated = false;""") # type: ignore[no-untyped-call] + self._wait_for_classification_computed() + + def unfreeze_time(self) -> None: + """Undo a previous freeze of the race time.""" + has_timer_backup = self._browser.execute_script( # type: ignore[no-untyped-call] + "return 'timer_backup' in document.client;""") + if not has_timer_backup: + raise RuntimeError("Did you forget to freeze the time?") + # Restore the javascript timer + self._browser.execute_script("""\ +document.client.timer.now = document.client.timer_backup; +delete document.client.timer_backup;""") # type: ignore[no-untyped-call] + # Force an update of the race time, and wait for the updated event to be triggered + # in order to be sure that the classification has been updated + self._browser.execute_script("""\ +document.client.gara.time = document.client.timer.now() +document.updated = false;""") # type: ignore[no-untyped-call] + self._wait_for_classification_computed() diff --git a/mathrace_interaction/pyproject.toml b/mathrace_interaction/pyproject.toml index b96bf71..2b5da1e 100644 --- a/mathrace_interaction/pyproject.toml +++ b/mathrace_interaction/pyproject.toml @@ -32,8 +32,10 @@ dependencies = [ "bs4", "jsondiff", "paramiko", + "prettytable", "requests", - "selenium" + "selenium", + "tinycss2" ] [project.urls] diff --git a/mathrace_interaction/tests/integration/network/test_turing_classification_selenium_integration.py b/mathrace_interaction/tests/integration/network/test_turing_classification_selenium_integration.py index 1e6b72c..1a34ed6 100644 --- a/mathrace_interaction/tests/integration/network/test_turing_classification_selenium_integration.py +++ b/mathrace_interaction/tests/integration/network/test_turing_classification_selenium_integration.py @@ -149,62 +149,78 @@ def test_classification_browser_get_teams_score_integration_non_default_race_tim @pytest.mark.parametrize("computation_rate_prefix", ["00:00:0", ""]) -def test_classification_browser_get_teams_score_integration_non_default_computation_rate( # type: ignore[no-any-unimported] +def test_classification_browser_get_teams_score_integration_over_time( # type: ignore[no-any-unimported] live_server: pytest_django.live_server_helper.LiveServer, simple_turing_race: engine.models.Gara, admin_user: engine.models.User, computation_rate_prefix: str ) -> None: - """Test team score computation with non default computation.""" + """Test team score computation with non default computation rate and freeze/unfreeze time.""" simple_turing_race.admin = admin_user simple_turing_race.save() - # Open two browsers + # Open three browsers browser1 = Browser(live_server, simple_turing_race.pk) browser1.login(admin_user) browser2 = Browser(live_server, simple_turing_race.pk) browser2.login(admin_user) + browser3 = Browser(live_server, simple_turing_race.pk) + browser3.login(admin_user) # Set race time to a time which is just before the first answer submission base_querystring = {"race_time": "00:05:28", "ended": "false"} - # Set a computation rate of 1 second in the first browser, and higher than 1 seconds in the second browser. + # Set a computation rate of 1 second in the first browser, and higher than 1 seconds in the other two browsers. querystring1 = {"computation_rate": computation_rate_prefix + "1"} | base_querystring - querystring2 = {"computation_rate": computation_rate_prefix + "8"} | base_querystring + querystring23 = {"computation_rate": computation_rate_prefix + "8"} | base_querystring browser1.go_to_classification_page("squadre", querystring1) - browser2.go_to_classification_page("squadre", querystring2) + browser2.go_to_classification_page("squadre", querystring23) + browser3.go_to_classification_page("squadre", querystring23) # Compute scores before the first answer submission browser1.lock() browser2.lock() + browser3.lock() scores1 = browser1.get_teams_score() scores2 = browser2.get_teams_score() + scores3 = browser2.get_teams_score() assert scores1 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70] assert scores2 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70] + assert scores3 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70] browser1.unlock() browser2.unlock() + browser3.unlock() # Wait until the first browser registers the first answer submission browser1._wait_for_classification_timer("00:05:31") - # Ensure that the second browser has not registered yet the first answer submission + # Ensure that the other two browsers have not registered yet the first answer submission timer_to_int = mathrace_interaction.time.convert_timestamp_to_number_of_seconds assert timer_to_int( browser2.find_element(selenium.webdriver.common.by.By.ID, "orologio").text) < timer_to_int("00:05:30") + assert timer_to_int( + browser3.find_element(selenium.webdriver.common.by.By.ID, "orologio").text) < timer_to_int("00:05:30") + + # Freeze time for the third browser (but not for the second) + browser3.freeze_time() - # Lock scores at the time of the first answer submission. + # Lock page content at the time of the first answer submission. # Only the first browser will see the difference in the scores. browser1.lock() browser2.lock() + browser3.lock() scores1 = browser1.get_teams_score() scores2 = browser2.get_teams_score() + scores3 = browser3.get_teams_score() assert scores1 == [70, 70, 70, 70, 115, 70, 70, 70, 70, 70] assert scores2 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70] + assert scores3 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70] browser1.unlock() browser2.unlock() + browser3.unlock() # Wait for the second browser to register the first answer submission as well - browser1._wait_for_classification_timer("00:05:40") + browser2._wait_for_classification_timer("00:05:32") - # Lock scores at this final time: both browser will see the updated scores. + # Lock page content at this final time: browser 1 and 2 will see now the same scores. browser1.lock() browser2.lock() scores1 = browser1.get_teams_score() @@ -214,6 +230,20 @@ def test_classification_browser_get_teams_score_integration_non_default_computat browser1.unlock() browser2.unlock() + # Browser 3 instead will still see the scores before the first submission. + browser3.lock() + scores3 = browser3.get_teams_score() + assert scores3 == [70, 70, 70, 70, 70, 70, 70, 70, 70, 70] + browser3.unlock() + + # Time needs to be unfrozen for the third browser to see the updated scores. + browser3.unfreeze_time() + browser3._wait_for_classification_timer("00:05:32") + browser3.lock() + scores3 = browser3.get_teams_score() + assert scores3 == [70, 70, 70, 70, 115, 70, 70, 70, 70, 70] + browser3.unlock() + def test_classification_browser_get_teams_position_integration( # type: ignore[no-any-unimported] live_server: pytest_django.live_server_helper.LiveServer, simple_turing_race: engine.models.Gara, diff --git a/mathrace_interaction/tests/unit/network/test_turing_classification_selenium.py b/mathrace_interaction/tests/unit/network/test_turing_classification_selenium.py index a21e901..8e5026d 100644 --- a/mathrace_interaction/tests/unit/network/test_turing_classification_selenium.py +++ b/mathrace_interaction/tests/unit/network/test_turing_classification_selenium.py @@ -311,11 +311,11 @@ def query_function(browser: Browser) -> list[int]: runtime_error_contains(lambda: query_function(browser), "The current page is not a squadre classification") -def test_classification_browser_get_css_sources( +def test_classification_browser_get_auxiliary_files( httpserver: pytest_httpserver.HTTPServer, runtime_error_contains: mathrace_interaction.typing.RuntimeErrorContainsFixtureType ) -> None: - """Test mathrace_interaction.network.TuringClassificationSelenium.get_css_sources.""" + """Test mathrace_interaction.network.TuringClassificationSelenium.get_auxiliary_files.""" index_page = """
@@ -350,8 +350,9 @@ def test_classification_browser_get_css_sources( # Get a dictionary containing the sources of all CSS files browser.lock() - all_css = browser.get_css_sources() + all_css, all_fonts = browser.get_auxiliary_files() assert len(all_css) == 2 + assert len(all_fonts) == 0 assert "style1.css" in all_css assert "style2.css" in all_css assert all_css["style1.css"] == style1_css @@ -514,3 +515,79 @@ def test_classification_browser_wait_for_classification_timer(httpserver: pytest browser._wait_for_classification_timer("00:00:01") assert "00:00:00" not in browser.page_source assert "00:00:01" in browser.page_source + + +def test_classification_browser_freeze_unfreeze_time( + httpserver: pytest_httpserver.HTTPServer, + runtime_error_contains: mathrace_interaction.typing.RuntimeErrorContainsFixtureType +) -> None: + """Test mathrace_interaction.network.TuringClassificationSelenium.freeze_time/unfreeze_time.""" + classification_page = """ + + +Current time is 0 + + +""" + httpserver.expect_request("/engine/classifica/0/unica").respond_with_data( + classification_page, content_type="text/html") + + browser = Browser(httpserver) + browser.go_to_classification_page("unica", {}) + assert "Current time is 0" in browser.page_source + + # Click the button, and verify that the time increases + increment_button = browser.find_element(selenium.webdriver.common.by.By.ID, "increment") + increment_button.click() + assert "Current time is 0" not in browser.page_source + assert "Current time is 1" in browser.page_source + + # Now freeze the time, click the button, and check that the time has not increased + t = browser.freeze_time() + assert t == 1 + increment_button.click() + assert "Current time is 0" not in browser.page_source + assert "Current time is 1" in browser.page_source + + # Now unfreeze the time, click the button, and check that the time increases again + browser.unfreeze_time() + increment_button.click() + assert "Current time is 0" not in browser.page_source + assert "Current time is 1" not in browser.page_source + assert "Current time is 2" in browser.page_source