diff --git a/.github/workflows/integrations.yml b/.github/workflows/integrations.yml index 4359e135f..658e004e1 100644 --- a/.github/workflows/integrations.yml +++ b/.github/workflows/integrations.yml @@ -64,10 +64,6 @@ jobs: - name: Compile Contract run: cargo build -p mpc-contract --target wasm32-unknown-unknown --release - - name: Build MPC Recovery Binary Locally - run: | - cargo build -p mpc-recovery --release - - name: Test run: cargo test -p mpc-recovery-integration-tests mpc --jobs 1 -- --test-threads 1 env: diff --git a/.gitignore b/.gitignore index 6709739cf..7d7a7b9de 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ /target .direnv .DS_Store + +flamegraph*.svg tmp diff --git a/Cargo.lock b/Cargo.lock index e8f966cfa..7d45ba9fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3674,6 +3674,7 @@ dependencies = [ "near-primitives 0.17.0", "near-units", "near-workspaces", + "nix", "once_cell", "rand 0.7.3", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index fc851a8cf..58403271a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,7 @@ members = [ "load-tests", "test-oidc-provider", ] + +[profile.flamegraph] +inherits = "release" +debug = true diff --git a/integration-tests/Cargo.toml b/integration-tests/Cargo.toml index f572526ab..c1e1de55c 100644 --- a/integration-tests/Cargo.toml +++ b/integration-tests/Cargo.toml @@ -25,6 +25,7 @@ near-fetch = "0.0.12" near-jsonrpc-client = "0.6" near-primitives = "0.17" near-units = "0.2.0" +nix = { version = "0.27", features = ["signal"] } once_cell = "1" rand = "0.7" serde = "1" @@ -50,3 +51,4 @@ mpc-contract = { path = "../contract" } [features] default = [] docker-test = [] +flamegraph = ["mpc-recovery/disable-open-telemetry"] diff --git a/integration-tests/README.md b/integration-tests/README.md index 4ed6d1611..412ccc010 100644 --- a/integration-tests/README.md +++ b/integration-tests/README.md @@ -16,12 +16,6 @@ Build OIDC Provider test image docker build -t near/test-oidc-provider ./test-oidc-provider ``` -Now, build mpc-recovery from the project's root: - -```BASH -cargo build --release -``` - Then run the integration tests: ```BASH @@ -44,6 +38,22 @@ Finally, run the integration tests with the built docker image: cargo test -p mpc-recovery-integration-tests --features docker-test ``` +## Profiling: Flamegraphs + +To profile code and get a flamegraph, run the following: + +```sh +cargo flamegraph --root --profile flamegraph --test lib +``` + +Or for a singular test like `test_basic_action`: + +```sh +cargo flamegraph --root --profile flamegraph --test lib -- test_basic_action +``` + +This will generate a `flamegraph.svg`. Open this on a browser and inspect each of the callstacks. + ## FAQ ### I want to run a test, but keep the docker containers from being destroyed diff --git a/integration-tests/src/env/local.rs b/integration-tests/src/env/local.rs index 57c5ac7c1..8fcef2054 100644 --- a/integration-tests/src/env/local.rs +++ b/integration-tests/src/env/local.rs @@ -1,12 +1,12 @@ use aes_gcm::aead::consts::U32; use aes_gcm::aead::generic_array::GenericArray; -use async_process::Child; use mpc_recovery::firewall::allowed::DelegateActionRelayer; use mpc_recovery::logging; use mpc_recovery::relayer::NearRpcAndRelayerClient; use multi_party_eddsa::protocols::ExpandedKeyPair; use crate::env::{LeaderNodeApi, SignerNodeApi}; +use crate::mpc::{self, NodeProcess}; use crate::util; pub struct SignerNode { @@ -19,8 +19,7 @@ pub struct SignerNode { gcp_datastore_url: String, // process held so it's not dropped. Once dropped, process will be killed. - #[allow(unused)] - process: Child, + _process: NodeProcess, } impl SignerNode { @@ -51,11 +50,10 @@ impl SignerNode { gcp_datastore_url: Some(ctx.datastore.local_address.clone()), jwt_signature_pk_url: ctx.oidc_provider.jwt_pk_local_url.clone(), logging_options: logging::Options::default(), - } - .into_str_args(); + }; - let sign_node_id = format!("sign/{node_id}"); - let process = util::spawn_mpc(ctx.release, &sign_node_id, &args)?; + let sign_node_id = format!("sign-{node_id}"); + let process = mpc::spawn(ctx.release, &sign_node_id, args).await?; let address = format!("http://127.0.0.1:{web_port}"); tracing::info!("Signer node is starting at {}", address); util::ping_until_ok(&address, 60).await?; @@ -69,7 +67,7 @@ impl SignerNode { cipher_key: *cipher_key, gcp_project_id: ctx.gcp_project_id.clone(), gcp_datastore_url: ctx.datastore.local_address.clone(), - process, + _process: process, }) } @@ -92,8 +90,7 @@ pub struct LeaderNode { relayer_url: String, // process held so it's not dropped. Once dropped, process will be killed. - #[allow(unused)] - process: Child, + _process: NodeProcess, } impl LeaderNode { @@ -134,10 +131,9 @@ impl LeaderNode { gcp_datastore_url: Some(ctx.datastore.local_address.clone()), jwt_signature_pk_url: ctx.oidc_provider.jwt_pk_local_url.clone(), logging_options: logging::Options::default(), - } - .into_str_args(); + }; - let process = util::spawn_mpc(ctx.release, "leader", &args)?; + let process = mpc::spawn(ctx.release, "leader", args).await?; let address = format!("http://127.0.0.1:{web_port}"); tracing::info!("Leader node container is starting at {}", address); util::ping_until_ok(&address, 60).await?; @@ -147,7 +143,7 @@ impl LeaderNode { address, near_rpc: ctx.relayer_ctx.sandbox.local_address.clone(), relayer_url: ctx.relayer_ctx.relayer.local_address.clone(), - process, + _process: process, }) } diff --git a/integration-tests/src/env/mod.rs b/integration-tests/src/env/mod.rs index 91cb9f651..01526deff 100644 --- a/integration-tests/src/env/mod.rs +++ b/integration-tests/src/env/mod.rs @@ -101,6 +101,12 @@ pub struct Context<'a> { } pub async fn setup(docker_client: &DockerClient) -> anyhow::Result> { + let release = true; + #[cfg(not(feature = "flamegraph"))] + if !crate::mpc::build(release).await?.success() { + anyhow::bail!("failed to prebuild MPC service"); + } + let gcp_project_id = GCP_PROJECT_ID; let docker_network = NETWORK; docker_client.create_network(docker_network).await?; @@ -124,7 +130,7 @@ pub async fn setup(docker_client: &DockerClient) -> anyhow::Result> gcp_project_id: gcp_project_id.to_string(), audience_id: FIREBASE_AUDIENCE_ID.to_string(), issuer: ISSUER.to_string(), - release: true, + release, relayer_ctx, datastore, oidc_provider, diff --git a/integration-tests/src/lib.rs b/integration-tests/src/lib.rs index 16d0ac126..3e4861d82 100644 --- a/integration-tests/src/lib.rs +++ b/integration-tests/src/lib.rs @@ -11,6 +11,7 @@ use near_workspaces::{ use crate::env::containers; pub mod env; +pub mod mpc; pub mod multichain; pub mod sandbox; pub mod util; diff --git a/integration-tests/src/mpc.rs b/integration-tests/src/mpc.rs new file mode 100644 index 000000000..259f1c629 --- /dev/null +++ b/integration-tests/src/mpc.rs @@ -0,0 +1,103 @@ +use std::path::{Path, PathBuf}; + +use anyhow::Context; +use async_process::{Child, Command, ExitStatus, Stdio}; +use tokio::runtime::Runtime; + +use mpc_recovery::Cli; + +const PACKAGE: &str = "mpc-recovery"; +const PACKAGE_MULTICHAIN: &str = "mpc-recovery-node"; + +/// NodeProcess holds onto the respective handles such that on drop, it will clean +/// the running process, task, or thread. +pub enum NodeProcess { + Subprocess(async_process::Child), + Threaded(std::thread::JoinHandle>), +} + +pub fn executable(release: bool, executable: &str) -> Option { + let executable = target_dir()? + .join(if release { "release" } else { "debug" }) + .join(executable); + Some(executable) +} + +fn target_dir() -> Option { + let mut out_dir = Path::new(std::env!("OUT_DIR")); + loop { + if out_dir.ends_with("target") { + break Some(out_dir.to_path_buf()); + } + + match out_dir.parent() { + Some(parent) => out_dir = parent, + None => break None, // We've reached the root directory and didn't find "target" + } + } +} + +pub async fn build(release: bool) -> anyhow::Result { + let mut cmd = Command::new("cargo"); + cmd.arg("build") + .arg("--package") + .arg(PACKAGE) + .envs(std::env::vars()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()); + + if release { + cmd.arg("--release"); + } + + Ok(cmd.spawn()?.status().await?) +} + +pub async fn spawn(release: bool, node: &str, cli: Cli) -> anyhow::Result { + if cfg!(feature = "flamegraph") { + let handle: std::thread::JoinHandle> = std::thread::spawn(|| { + let rt = Runtime::new()?; + rt.block_on(async move { + mpc_recovery::run(cli).await?; + anyhow::Result::<(), anyhow::Error>::Ok(()) + }) + .unwrap(); + Ok(()) + }); + + return Ok(NodeProcess::Threaded(handle)); + } + + let executable = executable(release, PACKAGE) + .with_context(|| format!("could not find target dir while starting {node} node"))?; + let child = Command::new(executable) + .args(cli.into_str_args()) + .env("RUST_LOG", "mpc_recovery=INFO") + .envs(std::env::vars()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .kill_on_drop(true) + .spawn() + .with_context(|| format!("failed to execute {node} node"))?; + + Ok(NodeProcess::Subprocess(child)) +} + +pub fn spawn_multichain( + release: bool, + node: &str, + cli: mpc_recovery_node::cli::Cli, +) -> anyhow::Result { + let executable = executable(release, PACKAGE_MULTICHAIN) + .with_context(|| format!("could not find target dir while starting {node} node"))?; + + Command::new(&executable) + .args(cli.into_str_args()) + .env("RUST_LOG", "mpc_recovery_node=INFO") + .envs(std::env::vars()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .kill_on_drop(true) + .spawn() + .with_context(|| format!("failed to run {node} node: {}", executable.display())) +} diff --git a/integration-tests/src/multichain/local.rs b/integration-tests/src/multichain/local.rs index 999969c35..d1fe3a733 100644 --- a/integration-tests/src/multichain/local.rs +++ b/integration-tests/src/multichain/local.rs @@ -1,4 +1,4 @@ -use crate::util; +use crate::{mpc, util}; use async_process::Child; use near_workspaces::AccountId; @@ -22,18 +22,17 @@ impl Node { account_sk: &near_workspaces::types::SecretKey, ) -> anyhow::Result { let web_port = util::pick_unused_port().await?; - let args = mpc_recovery_node::cli::Cli::Start { + let cli = mpc_recovery_node::cli::Cli::Start { node_id: node_id.into(), near_rpc: ctx.sandbox.local_address.clone(), mpc_contract_id: ctx.mpc_contract.id().clone(), account: account.clone(), account_sk: account_sk.to_string().parse()?, web_port, - } - .into_str_args(); + }; let mpc_node_id = format!("multichain/{node_id}"); - let process = util::spawn_mpc_multichain(ctx.release, &mpc_node_id, &args)?; + let process = mpc::spawn_multichain(ctx.release, &mpc_node_id, cli)?; let address = format!("http://127.0.0.1:{web_port}"); tracing::info!("node is starting at {}", address); util::ping_until_ok(&address, 60).await?; diff --git a/integration-tests/src/util.rs b/integration-tests/src/util.rs index 64c6b60ae..207d6ff95 100644 --- a/integration-tests/src/util.rs +++ b/integration-tests/src/util.rs @@ -1,19 +1,15 @@ +use std::{ + fs::{self, File}, + io::Write, +}; + use crate::containers::RelayerConfig; use anyhow::Context; -use async_process::{Child, Command, Stdio}; use hyper::{Body, Client, Method, Request, StatusCode, Uri}; use near_workspaces::{types::SecretKey, AccountId}; use serde::{Deserialize, Serialize}; -use std::{ - fs::{self, File}, - io::Write, - path::{Path, PathBuf}, -}; use toml::Value; -const EXECUTABLE: &str = "mpc-recovery"; -const EXECUTABLE_MULTICHAIN: &str = "mpc-recovery-node"; - pub async fn post( uri: U, request: Req, @@ -231,54 +227,3 @@ pub async fn ping_until_ok(addr: &str, timeout: u64) -> anyhow::Result<()> { .await?; Ok(()) } - -pub fn target_dir() -> Option { - let mut out_dir = Path::new(std::env!("OUT_DIR")); - loop { - if out_dir.ends_with("target") { - break Some(out_dir.to_path_buf()); - } - - match out_dir.parent() { - Some(parent) => out_dir = parent, - None => break None, // We've reached the root directory and didn't find "target" - } - } -} - -pub fn executable(release: bool, executable: &str) -> Option { - let executable = target_dir()? - .join(if release { "release" } else { "debug" }) - .join(executable); - Some(executable) -} - -pub fn spawn_mpc(release: bool, node: &str, args: &[String]) -> anyhow::Result { - let executable = executable(release, EXECUTABLE) - .with_context(|| format!("could not find target dir while starting {node} node"))?; - - Command::new(&executable) - .args(args) - .env("RUST_LOG", "mpc_recovery=INFO") - .envs(std::env::vars()) - .stdout(Stdio::inherit()) - .stderr(Stdio::inherit()) - .kill_on_drop(true) - .spawn() - .with_context(|| format!("failed to run {node} node: {}", executable.display())) -} - -pub fn spawn_mpc_multichain(release: bool, node: &str, args: &[String]) -> anyhow::Result { - let executable = executable(release, EXECUTABLE_MULTICHAIN) - .with_context(|| format!("could not find target dir while starting {node} node"))?; - - Command::new(&executable) - .args(args) - .env("RUST_LOG", "mpc_recovery_node=INFO") - .envs(std::env::vars()) - .stdout(Stdio::inherit()) - .stderr(Stdio::inherit()) - .kill_on_drop(true) - .spawn() - .with_context(|| format!("failed to run {node} node: {}", executable.display())) -} diff --git a/mpc-recovery/Cargo.toml b/mpc-recovery/Cargo.toml index 04496f83f..604f6ffb1 100644 --- a/mpc-recovery/Cargo.toml +++ b/mpc-recovery/Cargo.toml @@ -59,3 +59,7 @@ sha2 = "0.9.9" [dev-dependencies] rsa = "0.8.2" + +[features] +default = [] +disable-open-telemetry = [] diff --git a/mpc-recovery/src/lib.rs b/mpc-recovery/src/lib.rs index 7ed1815aa..0f83dc28d 100644 --- a/mpc-recovery/src/lib.rs +++ b/mpc-recovery/src/lib.rs @@ -212,14 +212,13 @@ pub async fn run(cmd: Cli) -> anyhow::Result<()> { jwt_signature_pk_url, logging_options, } => { - let _subscriber_guard = logging::default_subscriber_with_opentelemetry( + let _subscriber_guard = logging::subscribe_global( EnvFilter::from_default_env(), &logging_options, env.clone(), "leader".to_string(), ) - .await - .global(); + .await; let gcp_service = GcpService::new(env.clone(), gcp_project_id, gcp_datastore_url).await?; let account_creator_signer = @@ -258,14 +257,13 @@ pub async fn run(cmd: Cli) -> anyhow::Result<()> { jwt_signature_pk_url, logging_options, } => { - let _subscriber_guard = logging::default_subscriber_with_opentelemetry( + let _subscriber_guard = logging::subscribe_global( EnvFilter::from_default_env(), &logging_options, env.clone(), node_id.to_string(), ) - .await - .global(); + .await; let gcp_service = GcpService::new(env.clone(), gcp_project_id, gcp_datastore_url).await?; let oidc_providers = OidcProviderList { @@ -309,14 +307,13 @@ pub async fn run(cmd: Cli) -> anyhow::Result<()> { gcp_datastore_url, logging_options, } => { - let _subscriber_guard = logging::default_subscriber_with_opentelemetry( + let _subscriber_guard = logging::subscribe_global( EnvFilter::from_default_env(), &logging_options, env.clone(), node_id.to_string(), ) - .await - .global(); + .await; let gcp_service = GcpService::new( env.clone(), gcp_project_id.clone(), diff --git a/mpc-recovery/src/logging.rs b/mpc-recovery/src/logging.rs index 7da744f38..87ea39563 100644 --- a/mpc-recovery/src/logging.rs +++ b/mpc-recovery/src/logging.rs @@ -304,3 +304,26 @@ pub async fn default_subscriber_with_opentelemetry( writer_guard: Some(writer_guard), } } + +pub enum FeatureGuard { + Noop, + Default(DefaultSubscriberGuard), +} + +pub async fn subscribe_global( + env_filter: EnvFilter, + options: &Options, + env: String, + node_id: String, +) -> FeatureGuard { + if cfg!(feature = "disable-open-telemetry") { + FeatureGuard::Noop + } else { + let subscriber_guard = + default_subscriber_with_opentelemetry(env_filter, options, env, node_id) + .await + .global(); + + FeatureGuard::Default(subscriber_guard) + } +} diff --git a/mpc-recovery/src/oauth.rs b/mpc-recovery/src/oauth.rs index e50ec2a9d..72d74fae2 100644 --- a/mpc-recovery/src/oauth.rs +++ b/mpc-recovery/src/oauth.rs @@ -18,6 +18,7 @@ pub async fn verify_oidc_token( let public_keys = get_pagoda_firebase_public_keys(client, jwt_signature_pk_url) .await .map_err(|e| anyhow::anyhow!("failed to get Firebase public key: {e}"))?; + tracing::info!("verify_oidc_token firebase public keys: {public_keys:?}"); let mut last_occured_error = anyhow::anyhow!("Unexpected error. Firebase public keys not found");