Skip to content

Commit

Permalink
Adding prometheus dashboard support
Browse files Browse the repository at this point in the history
Co-authored-by: Todd Gaugler <[email protected]>
Signed-off-by: Tim Paine <[email protected]>
  • Loading branch information
timkpaine and gauglertodd committed Apr 15, 2024
1 parent 3eca54b commit 67493ec
Showing 1 changed file with 107 additions and 51 deletions.
158 changes: 107 additions & 51 deletions raydar/task_tracker/task_tracker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import asyncio
import coolname
import datetime
import itertools
import logging
import os
import pandas as pd
import polars as pl
import ray
from collections.abc import Iterable
import re
import requests
import time
from collections import defaultdict
from dataclasses import dataclass
from packaging.version import Version
from prometheus_client.openmetrics import parser
from ray.serve import shutdown
from ray.util.state.common import TaskState
from typing import Optional

from .schema import schema as default_schema

Expand All @@ -30,18 +34,10 @@ class AsyncMetadataTrackerCallback:
back to that actor.
"""

def __init__(self, name: str, namespace: str):
def __init__(self, name, namespace):
self.actor = ray.get_actor(name, namespace)

def process(self, obj_refs: Iterable[ray.ObjectRef]) -> None:
"""Processes an interable collection of ray.ObjectRefs.
Iterates through the collection, finds completed references, and returns those references to the
self.actor attribute via its .callback remote function.
Args:
obj_refs: An iterable collection of (possibly) in-progress ray object references
"""
def process(self, obj_refs):
active_tasks = set(obj_refs)
while len(active_tasks) > 0:
finished_tasks = []
Expand All @@ -55,19 +51,19 @@ def process(self, obj_refs: Iterable[ray.ObjectRef]) -> None:
self.actor.callback.remote(finished_tasks)
return

def exit(self) -> None:
"""Terminate this actor"""
def exit(self):
ray.actor.exit_actor()


@ray.remote(resources={"node:__internal_head__": 0.1}, num_cpus=0)
class AsyncMetadataTracker:
def __init__(
self,
name: str,
namespace: str,
path: Optional[str] = None,
enable_perspective_dashboard: bool = False,
name,
namespace,
path=None,
enable_perspective_dashboard=False,
scrape_prometheus_metrics=False,
):
"""An async Ray Actor Class to track task level metadata.
Expand Down Expand Up @@ -117,8 +113,8 @@ def __init__(
)
if Version(ray.__version__) < Version("2.10"):
kwargs["port"] = os.environ.get("RAYDAR_PORT", 8000)

self.webserver = ray.serve.run(**kwargs)

self.proxy_server = ray.serve.run(
PerspectiveProxyRayServer.bind(self.webserver),
name="proxy",
Expand Down Expand Up @@ -149,20 +145,87 @@ def __init__(
"error_message": "str",
},
)

def get_proxy_server(self) -> ray.serve.handle.DeploymentHandle:
"""A getter for this actors proxy server attribute. Can be used to create custom perspective visuals.
Returns: this actors proxy_server attribute
"""
if scrape_prometheus_metrics:
self.__scraping_job = self.scrape_prometheus_metrics()

def scrape_prometheus_metrics(self):
@dataclass
class ParsedOpenMetricsData:
metric_name: str
metric_description: str
metric_type: str
metric_value: str
metric_metadata: str

def parse_response(text):
parsed_data = []
metric_name = None
metric_description = None
for line in text.split("\n"):
if len(line) > 0:
if line.startswith("# HELP "):
metric_description = " ".join(line.split(" ")[3:])
elif line.startswith("# TYPE "):
_, _, metric_name, metric_type = line.split(" ")
else:
matches = re.search(r".*\{(.*)\}(.*)", line)
if matches is not None:
metric_metadata, metric_value = matches.groups()
metric_metadata = parser._parse_labels_with_state_machine(metric_metadata)[0]
else:
_, metric_value = line.split(" ")
metric_metadata = dict()
parsed_data.append(
ParsedOpenMetricsData(
metric_name=metric_name,
metric_description=metric_description,
metric_type=metric_type,
metric_value=eval(metric_value),
metric_metadata=metric_metadata,
)
)
return parsed_data

@ray.remote
def scrape():
metrics = set()
while True:
time.sleep(2)
for node in ray.nodes():
all_values = defaultdict(list)
if node.get("Alive", False):
node_manager_address = node.get("NodeManagerAddress")
metrics_export_port = node.get("MetricsExportPort")
response = requests.get(f"http://{node_manager_address}:{metrics_export_port}/metrics")
if response.status_code == 200:
parsed_values = parse_response(response.text)
for parsed_value in parsed_values:
data = dict(
metric_name=parsed_value.metric_name,
metric_description=parsed_value.metric_description,
metric_type=parsed_value.metric_type,
metric_value=parsed_value.metric_value,
timestamp=datetime.datetime.now(),
)
for key, value in parsed_value.metric_metadata.items():
data[key] = value
all_values[parsed_value.metric_name].append(data)

for key, values in all_values.items():
if key not in metrics:
metrics.add(key)
self.proxy_server.remote("new", key, values)
else:
self.proxy_server.remote("update", key, values)

return scrape.remote()

def get_proxy_server(self):
if self.proxy_server:
return self.proxy_server
raise Exception("This task_tracker has no active proxy_server.")

def callback(self, tasks: Iterable[ray.ObjectRef]) -> None:
"""A remote function used by this actor's processor actor attribute. Will be called by a separate actor
with a collection of ray object references once those ObjectReferences are not in the "RUNNING" or
"PENDING" state.
"""
def callback(self, tasks):
# WARNING: Do not move this import. Importing these modules elsewhere can cause
# difficult to diagnose, "There is no current event loop in thread 'ray_client_server_" errors.
asyncio.set_event_loop(asyncio.new_event_loop())
Expand Down Expand Up @@ -203,13 +266,15 @@ def metadata_filter(task) -> bool:
if self.perspective_dashboard_enabled:
self.update_perspective_dashboard(completed_tasks)

def update_perspective_dashboard(self, completed_tasks: Iterable[TaskState]) -> None:
def update_perspective_dashboard(self, completed_tasks):
"""A helper function, which updates this actor's proxy_server attribute with processed data.
That proxy_server serves perspective tables which anticipate the data formats we provide.
Args:
completed_tasks: A list of tuples of the form (ObjectReference, TaskMetadata), where the ObjectReferences are neither Running nor Pending Assignment.
completed_tasks: A list of tuples of the form (ObjectReference, TaskMetadata), where the
ObjectReferences are neither Running nor Pending Assignment.
"""
data = [
dict(
Expand Down Expand Up @@ -237,7 +302,7 @@ def update_perspective_dashboard(self, completed_tasks: Iterable[TaskState]) ->
]
self.proxy_server.remote("update", self.perspective_table_name, data)

async def process(self, obj_refs: Iterable[ray.ObjectRef], metadata: Optional[Iterable[str]] = None, chunk_size: int = 25_000) -> None:
async def process(self, obj_refs, metadata=None, chunk_size=25_000):
"""An asynchronous function to process a collection of Ray object references.
Sends sub-collections of object references of size chunk_size to its AsyncMetadataTrackerCallback actor.
Expand All @@ -253,8 +318,7 @@ async def process(self, obj_refs: Iterable[ray.ObjectRef], metadata: Optional[It
for i in range(0, len(obj_refs), chunk_size):
self.processor.process.remote(obj_refs[i : i + chunk_size])

def get_df(self) -> pl.DataFrame:
"""Retrieves an internally maintained dataframe of task related information pulled from the ray GCS"""
def get_df(self):
self.df = pl.DataFrame(
data={
# fmt: off
Expand Down Expand Up @@ -288,17 +352,15 @@ def get_df(self) -> pl.DataFrame:
)
return self.df

def save_df(self) -> None:
"""Saves the internally maintained dataframe of task related information from the ray GCS"""
def save_df(self):
self.get_df()
if self.path is not None and self.df is not None:
logger.info(f"Writing DataFrame to {self.path}")
self.df.write_parquet(self.path)
return True
return False

def clear_df(self) -> None:
"""Clears the internally maintained dataframe of task related information from the ray GCS"""
def clear_df(self):
self.df = None
self.finished_tasks = {}
if self.perspective_dashboard_enabled:
Expand Down Expand Up @@ -332,33 +394,27 @@ def __init__(self, name: str = "task_tracker", namespace: str = None, **kwargs):
**kwargs,
)

def process(self, object_refs: Iterable[ray.ObjectRef], metadata: Optional[Iterable[str]] = None, chunk_size: int = 25_000) -> None:
"""A helper function, to send this object's AsyncMetadataTracker actor a collection of object references to track"""
def process(self, object_refs, metadata=None, chunk_size=25_000):
self.tracker.process.remote(object_refs, metadata=metadata, chunk_size=chunk_size)

def get_df(self, process_user_metadata_column=False) -> pl.DataFrame:
"""Fetches this object's AsyncMetadataTracker's internal dataframe object"""
def get_df(self, process_user_metadata_column=False):
df = ray.get(self.tracker.get_df.remote())
if process_user_metadata_column:
user_metadata_frame = pl.from_pandas(pd.json_normalize(df["user_defined_metadata"].to_pandas()))
df_with_user_metadata = pl.concat([df, user_metadata_frame], how="horizontal")
return df_with_user_metadata
return df

def save_df(self) -> None:
"""Save the dataframe used by this object's AsyncMetadataTracker actor"""
def save_df(self):
return ray.get(self.tracker.save_df.remote())

def clear(self) -> None:
"""Clear the dataframe used by this object's AsyncMetadataTracker actor"""
def clear(self):
return ray.get(self.tracker.clear_df.remote())

def proxy_server(self) -> ray.serve.handle.DeploymentHandle:
"""Fetch the proxy server used by this object's AsyncMetadataTracker actor"""
def proxy_server(self):
return ray.get(self.tracker.get_proxy_server.remote())

def exit(self) -> None:
"""Perform cleanup tasks, kill associated actors, and shutdown."""
def exit(self):
ray.kill(ray.get_actor(name=self.name, namespace=self.namespace))
ray.kill(ray.get_actor(name=get_callback_actor_name(self.name), namespace=self.namespace))
shutdown()

0 comments on commit 67493ec

Please sign in to comment.