Skip to content

Commit

Permalink
update query utility to support non-integer jobids (e.g. flux) and
Browse files Browse the repository at this point in the history
query intervales less than 1 sec.

Signed-off-by: Karl W. Schulz <[email protected]>
  • Loading branch information
koomie committed Dec 19, 2024
1 parent e39cf94 commit 50965a5
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions omnistat/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __del__(self):

def set_options(self, jobID=None, output_file=None, pdf=None, interval=None, step=None):
if jobID:
self.jobID = int(jobID)
self.jobID = jobID
if output_file:
self.output_file = output_file
if pdf:
Expand Down Expand Up @@ -113,7 +113,6 @@ def setup(self):
self.prometheus = PrometheusConnect(url=self.config["prometheus_url"])

# query jobinfo
assert self.jobID > 1
self.jobinfo = self.query_slurm_job_internal()
if self.jobinfo["begin_date"] == "Unknown":
print("Job %s has not run yet." % self.jobID)
Expand Down Expand Up @@ -309,7 +308,11 @@ def get_hosts(self):

assert self.stepNodes > 0
assert self.totalNodes > 0
assert self.totalNodes == self.jobinfo["num_nodes"]
if self.totalNodes != self.jobinfo["num_nodes"]:
logging.warn("")
logging.warn("[WARNING]: telemetry data not collected for all nodes assigned to this job")
logging.warn("--> # assigned hosts = %i" % self.jobinfo["num_nodes"])
logging.warn("--> # of hosts with data = %i" % self.totalNodes)

def metric_host_max_sum(self, values):
"""Determine host with <maximum> sum of all provided samples"""
Expand Down Expand Up @@ -439,7 +442,7 @@ def generate_report_card(self):

print("")
print("-" * 70)
print("Omnistat Report Card for Job # %i" % self.jobID)
print("Omnistat Report Card for Job Id: %s" % self.jobID)
if self.jobStep:
print(
"** Report confined to job step=%s (%i of %i nodes used)"
Expand Down Expand Up @@ -483,7 +486,7 @@ def generate_report_card(self):
print("")

print("--")
print("Query interval = %i secs" % self.interval)
print("Query interval = %.3f secs" % self.interval)
print("Query execution time = %.1f secs" % (timeit.default_timer() - self.timer_start))
version = self.version
# if self.sha != "Unknown":
Expand Down Expand Up @@ -518,7 +521,7 @@ def query_time_series_data(self, metric_name, reducer=None, dataType=float):
for i in range(len(results)):
tmpresult = np.asarray(results[i]["values"])
# convert to time format
tmptime = tmpresult[:, 0].astype(int).astype("datetime64[s]")
tmptime = tmpresult[:, 0].astype(float).astype("datetime64[s]")
if dataType == int:
tmpvalues = tmpresult[:, 1].astype(int)
elif dataType == float:
Expand All @@ -532,7 +535,7 @@ def query_time_series_data(self, metric_name, reducer=None, dataType=float):
results = np.asarray(results[0]["values"])

# convert to time format
time = results[:, 0].astype(int).astype("datetime64[s]")
time = results[:, 0].astype(float).astype("datetime64[s]")
# let user decide on conversion type for gauge metric
if dataType == int:
values = results[:, 1].astype(int)
Expand Down Expand Up @@ -837,7 +840,7 @@ def main():
parser.add_argument("-v", "--version", help="print version info and exit", action="store_true")
parser.add_argument("--job", help="jobId to query")
parser.add_argument("--step", help="SLURM job step to restrict query interval")
parser.add_argument("--interval", type=int, help="sampling interval in secs (default=60)", default=60)
parser.add_argument("--interval", type=float, help="sampling interval in secs (default=30)", default=30)
parser.add_argument("--output", help="location for stdout report")
parser.add_argument("--configfile", type=str, help="runtime config file", default=None)
parser.add_argument("--pdf", help="generate PDF report")
Expand Down

0 comments on commit 50965a5

Please sign in to comment.