diff --git a/raydar/task_tracker/task_tracker.py b/raydar/task_tracker/task_tracker.py index 6ba3c76..2f779b1 100644 --- a/raydar/task_tracker/task_tracker.py +++ b/raydar/task_tracker/task_tracker.py @@ -1,16 +1,20 @@ import asyncio import coolname +import datetime import itertools import logging import os import pandas as pd import polars as pl import ray -from collections.abc import Iterable +import re +import requests +import time +from collections import defaultdict +from dataclasses import dataclass from packaging.version import Version +from prometheus_client.openmetrics import parser from ray.serve import shutdown -from ray.util.state.common import TaskState -from typing import Optional from .schema import schema as default_schema @@ -30,18 +34,10 @@ class AsyncMetadataTrackerCallback: back to that actor. """ - def __init__(self, name: str, namespace: str): + def __init__(self, name, namespace): self.actor = ray.get_actor(name, namespace) - def process(self, obj_refs: Iterable[ray.ObjectRef]) -> None: - """Processes an interable collection of ray.ObjectRefs. - - Iterates through the collection, finds completed references, and returns those references to the - self.actor attribute via its .callback remote function. - - Args: - obj_refs: An iterable collection of (possibly) in-progress ray object references - """ + def process(self, obj_refs): active_tasks = set(obj_refs) while len(active_tasks) > 0: finished_tasks = [] @@ -55,8 +51,7 @@ def process(self, obj_refs: Iterable[ray.ObjectRef]) -> None: self.actor.callback.remote(finished_tasks) return - def exit(self) -> None: - """Terminate this actor""" + def exit(self): ray.actor.exit_actor() @@ -64,10 +59,11 @@ def exit(self) -> None: class AsyncMetadataTracker: def __init__( self, - name: str, - namespace: str, - path: Optional[str] = None, - enable_perspective_dashboard: bool = False, + name, + namespace, + path=None, + enable_perspective_dashboard=False, + scrape_prometheus_metrics=False, ): """An async Ray Actor Class to track task level metadata. @@ -117,8 +113,8 @@ def __init__( ) if Version(ray.__version__) < Version("2.10"): kwargs["port"] = os.environ.get("RAYDAR_PORT", 8000) - self.webserver = ray.serve.run(**kwargs) + self.proxy_server = ray.serve.run( PerspectiveProxyRayServer.bind(self.webserver), name="proxy", @@ -149,20 +145,87 @@ def __init__( "error_message": "str", }, ) - - def get_proxy_server(self) -> ray.serve.handle.DeploymentHandle: - """A getter for this actors proxy server attribute. Can be used to create custom perspective visuals. - Returns: this actors proxy_server attribute - """ + if scrape_prometheus_metrics: + self.__scraping_job = self.scrape_prometheus_metrics() + + def scrape_prometheus_metrics(self): + @dataclass + class ParsedOpenMetricsData: + metric_name: str + metric_description: str + metric_type: str + metric_value: str + metric_metadata: str + + def parse_response(text): + parsed_data = [] + metric_name = None + metric_description = None + for line in text.split("\n"): + if len(line) > 0: + if line.startswith("# HELP "): + metric_description = " ".join(line.split(" ")[3:]) + elif line.startswith("# TYPE "): + _, _, metric_name, metric_type = line.split(" ") + else: + matches = re.search(r".*\{(.*)\}(.*)", line) + if matches is not None: + metric_metadata, metric_value = matches.groups() + metric_metadata = parser._parse_labels_with_state_machine(metric_metadata)[0] + else: + _, metric_value = line.split(" ") + metric_metadata = dict() + parsed_data.append( + ParsedOpenMetricsData( + metric_name=metric_name, + metric_description=metric_description, + metric_type=metric_type, + metric_value=eval(metric_value), + metric_metadata=metric_metadata, + ) + ) + return parsed_data + + @ray.remote + def scrape(): + metrics = set() + while True: + time.sleep(2) + for node in ray.nodes(): + all_values = defaultdict(list) + if node.get("Alive", False): + node_manager_address = node.get("NodeManagerAddress") + metrics_export_port = node.get("MetricsExportPort") + response = requests.get(f"http://{node_manager_address}:{metrics_export_port}/metrics") + if response.status_code == 200: + parsed_values = parse_response(response.text) + for parsed_value in parsed_values: + data = dict( + metric_name=parsed_value.metric_name, + metric_description=parsed_value.metric_description, + metric_type=parsed_value.metric_type, + metric_value=parsed_value.metric_value, + timestamp=datetime.datetime.now(), + ) + for key, value in parsed_value.metric_metadata.items(): + data[key] = value + all_values[parsed_value.metric_name].append(data) + + for key, values in all_values.items(): + if key not in metrics: + metrics.add(key) + self.proxy_server.remote("new", key, values) + else: + self.proxy_server.remote("update", key, values) + + return scrape.remote() + + def get_proxy_server(self): if self.proxy_server: return self.proxy_server raise Exception("This task_tracker has no active proxy_server.") - def callback(self, tasks: Iterable[ray.ObjectRef]) -> None: - """A remote function used by this actor's processor actor attribute. Will be called by a separate actor - with a collection of ray object references once those ObjectReferences are not in the "RUNNING" or - "PENDING" state. - """ + def callback(self, tasks): # WARNING: Do not move this import. Importing these modules elsewhere can cause # difficult to diagnose, "There is no current event loop in thread 'ray_client_server_" errors. asyncio.set_event_loop(asyncio.new_event_loop()) @@ -203,13 +266,15 @@ def metadata_filter(task) -> bool: if self.perspective_dashboard_enabled: self.update_perspective_dashboard(completed_tasks) - def update_perspective_dashboard(self, completed_tasks: Iterable[TaskState]) -> None: + def update_perspective_dashboard(self, completed_tasks): """A helper function, which updates this actor's proxy_server attribute with processed data. That proxy_server serves perspective tables which anticipate the data formats we provide. Args: - completed_tasks: A list of tuples of the form (ObjectReference, TaskMetadata), where the ObjectReferences are neither Running nor Pending Assignment. + completed_tasks: A list of tuples of the form (ObjectReference, TaskMetadata), where the + ObjectReferences are neither Running nor Pending Assignment. + """ data = [ dict( @@ -237,7 +302,7 @@ def update_perspective_dashboard(self, completed_tasks: Iterable[TaskState]) -> ] self.proxy_server.remote("update", self.perspective_table_name, data) - async def process(self, obj_refs: Iterable[ray.ObjectRef], metadata: Optional[Iterable[str]] = None, chunk_size: int = 25_000) -> None: + async def process(self, obj_refs, metadata=None, chunk_size=25_000): """An asynchronous function to process a collection of Ray object references. Sends sub-collections of object references of size chunk_size to its AsyncMetadataTrackerCallback actor. @@ -253,8 +318,7 @@ async def process(self, obj_refs: Iterable[ray.ObjectRef], metadata: Optional[It for i in range(0, len(obj_refs), chunk_size): self.processor.process.remote(obj_refs[i : i + chunk_size]) - def get_df(self) -> pl.DataFrame: - """Retrieves an internally maintained dataframe of task related information pulled from the ray GCS""" + def get_df(self): self.df = pl.DataFrame( data={ # fmt: off @@ -288,8 +352,7 @@ def get_df(self) -> pl.DataFrame: ) return self.df - def save_df(self) -> None: - """Saves the internally maintained dataframe of task related information from the ray GCS""" + def save_df(self): self.get_df() if self.path is not None and self.df is not None: logger.info(f"Writing DataFrame to {self.path}") @@ -297,8 +360,7 @@ def save_df(self) -> None: return True return False - def clear_df(self) -> None: - """Clears the internally maintained dataframe of task related information from the ray GCS""" + def clear_df(self): self.df = None self.finished_tasks = {} if self.perspective_dashboard_enabled: @@ -332,12 +394,10 @@ def __init__(self, name: str = "task_tracker", namespace: str = None, **kwargs): **kwargs, ) - def process(self, object_refs: Iterable[ray.ObjectRef], metadata: Optional[Iterable[str]] = None, chunk_size: int = 25_000) -> None: - """A helper function, to send this object's AsyncMetadataTracker actor a collection of object references to track""" + def process(self, object_refs, metadata=None, chunk_size=25_000): self.tracker.process.remote(object_refs, metadata=metadata, chunk_size=chunk_size) - def get_df(self, process_user_metadata_column=False) -> pl.DataFrame: - """Fetches this object's AsyncMetadataTracker's internal dataframe object""" + def get_df(self, process_user_metadata_column=False): df = ray.get(self.tracker.get_df.remote()) if process_user_metadata_column: user_metadata_frame = pl.from_pandas(pd.json_normalize(df["user_defined_metadata"].to_pandas())) @@ -345,20 +405,16 @@ def get_df(self, process_user_metadata_column=False) -> pl.DataFrame: return df_with_user_metadata return df - def save_df(self) -> None: - """Save the dataframe used by this object's AsyncMetadataTracker actor""" + def save_df(self): return ray.get(self.tracker.save_df.remote()) - def clear(self) -> None: - """Clear the dataframe used by this object's AsyncMetadataTracker actor""" + def clear(self): return ray.get(self.tracker.clear_df.remote()) - def proxy_server(self) -> ray.serve.handle.DeploymentHandle: - """Fetch the proxy server used by this object's AsyncMetadataTracker actor""" + def proxy_server(self): return ray.get(self.tracker.get_proxy_server.remote()) - def exit(self) -> None: - """Perform cleanup tasks, kill associated actors, and shutdown.""" + def exit(self): ray.kill(ray.get_actor(name=self.name, namespace=self.namespace)) ray.kill(ray.get_actor(name=get_callback_actor_name(self.name), namespace=self.namespace)) shutdown()