Skip to content

Commit

Permalink
Merge pull request #10 from ErichDonGubler/parity-with-wpt-fetch-logs
Browse files Browse the repository at this point in the history
Achieve parity with common `mach wpt-fetch-logs` use cases
  • Loading branch information
ErichDonGubler authored Nov 13, 2023
2 parents b394ca9 + 04a972f commit 09ef418
Showing 1 changed file with 128 additions and 83 deletions.
211 changes: 128 additions & 83 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{
collections::{BTreeMap, HashSet},
fs, io,
fs,
num::NonZeroU8,
path::PathBuf,
process::exit,
str::FromStr,
sync::{Arc, Mutex},
};

Expand Down Expand Up @@ -132,42 +133,85 @@ struct Job {

#[derive(Debug, Parser)]
struct Cli {
#[clap(flatten)]
options: Options,
#[clap(value_parser = RevisionRef::from_str)]
revisions: Vec<RevisionRef>,
}

#[derive(Debug, Parser)]
struct Options {
#[clap(long)]
out_dir: PathBuf,
#[clap(long)]
revision: String,
#[clap(long = "job-type-re")]
job_type_name_regex: Option<Regex>,
#[clap(long = "artifact")]
artifact_names: Vec<String>,
#[clap(long = "max-parallel", default_value = "10")]
max_parallel_artifact_downloads: NonZeroU8,
#[clap(long = "project", default_value = "try")]
project_name: String,
#[clap(long, default_value = "https://treeherder.mozilla.org")]
treeherder_host: Url,
#[clap(long, default_value = "https://firefox-ci-tc.services.mozilla.com")]
taskcluster_host: Url,
}

#[derive(Clone, Debug)]
struct RevisionRef {
project: String,
hash: String,
}

impl FromStr for RevisionRef {
type Err = &'static str;

fn from_str(s: &str) -> Result<Self, Self::Err> {
s.split_once(':')

Check warning on line 168 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo-clippy (macos-latest)

unnecessary closure used to substitute value for `Option::None`

Check warning on line 168 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo-clippy (macos-latest)

unnecessary closure used to substitute value for `Option::None`

Check warning on line 168 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo-clippy (ubuntu-latest)

unnecessary closure used to substitute value for `Option::None`

Check warning on line 168 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo-clippy (ubuntu-latest)

unnecessary closure used to substitute value for `Option::None`

Check warning on line 168 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo-clippy (windows-latest)

unnecessary closure used to substitute value for `Option::None`

Check warning on line 168 in src/main.rs

View workflow job for this annotation

GitHub Actions / cargo-clippy (windows-latest)

unnecessary closure used to substitute value for `Option::None`
.map(|(project, hash)| Self {
project: project.to_owned(),
hash: hash.to_owned(),
})
.ok_or_else(|| {
"no dividing colon found; expected revision ref. of the form <project>:<hash>"
})
}
}

#[tokio::main]
async fn main() {
env_logger::init();
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.init();

let Cli { options, revisions } = Cli::parse();

let Cli {
let client = Client::new();

for rev_ref in revisions {
get_artifacts_for_revision(&client, &options, &rev_ref).await
}
}

async fn get_artifacts_for_revision(client: &Client, options: &Options, revision: &RevisionRef) {
let Options {
out_dir,
revision,
job_type_name_regex,
artifact_names,
max_parallel_artifact_downloads,
project_name,
treeherder_host,
taskcluster_host,
} = Cli::parse();
} = options;
let RevisionRef {
project: project_name,
hash: revision,
} = revision;

let client = Client::new();
log::info!("fetching for revision(s): {:?}", [&revision]);

let revision = client
let Revision {
meta: RevisionMeta { count },
mut results,
} = client
.get(format!(
"{treeherder_host}api/project/{project_name}/push/?revision={revision}"
))
Expand All @@ -178,10 +222,6 @@ async fn main() {
.await
.unwrap();

log::info!("fetching for revision(s): {:?}", [&revision]);

let Revision { meta, mut results } = revision;
let RevisionMeta { count } = meta;
assert!(results.len() == usize::try_from(count).unwrap());
if count > 1 {
log::warn!("more than one `result` found for specified push");
Expand Down Expand Up @@ -263,80 +303,85 @@ async fn main() {
progress_bar.tick(); // Force the progress bar to show now, rather than waiting until first
// completion of a download.

fs::remove_dir_all(&out_dir)
.or_else(|e| match e.kind() {
io::ErrorKind::NotFound => Ok(()),
e => Err(e),
})
.unwrap();

let task_counts = Arc::new(Mutex::new(BTreeMap::new()));
let max_parallel_artifact_downloads = usize::from(max_parallel_artifact_downloads.get());
progress_bar
.wrap_stream(artifacts)
.for_each_concurrent(max_parallel_artifact_downloads, |(job, artifact_name)| {
let client = &client;
let out_dir = &out_dir;
let task_counts = &task_counts;
let taskcluster_host = &taskcluster_host;
let progress_bar = progress_bar.clone();
async move {
let Job {
job_group_symbol,
job_type_symbol,
job_type_name,
platform,
task_id,
platform_option,
..
} = job;

let job_path =
format!("{platform}/{platform_option}/{job_group_symbol}/{job_type_symbol}");

let this_task_idx: u32;
{
let mut task_counts = task_counts.lock().unwrap();
let task_count = task_counts.entry(job_path.clone()).or_insert(0);
this_task_idx = *task_count;
*task_count += 1;
}
let artifacts = artifacts.then(|(job, artifact_name)| {
let client = &client;
let out_dir = &out_dir;
let task_counts = &task_counts;
let taskcluster_host = &taskcluster_host;
let progress_bar = progress_bar.clone();
async move {
let Job {
job_group_symbol,
job_type_symbol,
job_type_name,
platform,
task_id,
platform_option,
..
} = job;

let job_path = format!(
"{revision}/{platform}/{platform_option}/{job_group_symbol}/{job_type_symbol}"
);

let this_task_idx: u32;
{
let mut task_counts = task_counts.lock().unwrap();
let task_count = task_counts.entry(job_path.clone()).or_insert(0);
this_task_idx = *task_count;
*task_count += 1;
}

let local_artifact_path = {
let mut path = out_dir.join(job_path);
path.push(&this_task_idx.to_string());
path.push(artifact_name);
path
};

if local_artifact_path.is_file() {
progress_bar.suspend(|| {
log::info!(
"skipping file that already appears to be downloaded: {}",
local_artifact_path.display()
);
});
return;
}

let artifact =
match get_artifact(client, taskcluster_host, task_id, artifact_name).await {
Ok(bytes) => bytes,
Err(code) => {
progress_bar.suspend(|| {
log::error!(
"got unexpected response {code} with request for task \
let artifact =
match get_artifact(client, taskcluster_host, task_id, artifact_name).await {
Ok(bytes) => bytes,
Err(code) => {
progress_bar.suspend(|| {
log::error!(
"got unexpected response {code} with request for task \
{task_id:?} ({job_type_name:?}, index {this_task_idx}), \
artifact {artifact_name:?}; skipping download",
);
});
return;
}
};

let local_artifact_path = {
let mut path = out_dir.join(job_path);
path.push(&this_task_idx.to_string());
path.push(artifact_name);
path
);
});
return;
}
};

{
let parent_dir = local_artifact_path.parent().unwrap();
fs::create_dir_all(parent_dir).unwrap_or_else(|e| {
panic!("failed to create `{}`: {e}", parent_dir.display())
});
}
fs::write(&local_artifact_path, artifact).unwrap_or_else(|e| {
panic!(
"failed to write artifact `{}`: {e}",
local_artifact_path.display()
)
});
{
let parent_dir = local_artifact_path.parent().unwrap();
fs::create_dir_all(parent_dir)
.unwrap_or_else(|e| panic!("failed to create `{}`: {e}", parent_dir.display()));
}
fs::write(&local_artifact_path, artifact).unwrap_or_else(|e| {
panic!(
"failed to write artifact `{}`: {e}",
local_artifact_path.display()
)
});
}
});
let max_parallel_artifact_downloads = usize::from(max_parallel_artifact_downloads.get());
artifacts
.for_each_concurrent(max_parallel_artifact_downloads, |()| async {
progress_bar.inc(1)
})
.await;
}
Expand Down

0 comments on commit 09ef418

Please sign in to comment.