diff --git a/.github/workflows/unit.yml b/.github/workflows/unit.yml index 7fa884fe1..f6085cdd3 100644 --- a/.github/workflows/unit.yml +++ b/.github/workflows/unit.yml @@ -100,5 +100,6 @@ jobs: # RUSTSEC-2024-0344 and RUSTSEC-2022-0093 are both to do with ed25519 signatures in near-sdk, we don't sign things with this library so it's safe # RUSTSEC-2022-0054 wee-alloc is unmaintained, it's fine for now because we barely use an allocator and the contracts are short lived, but we should find a replacement/use the default allocator # RUSTSEC-2021-0145 atty can do an unallocated read with a custom allocator in windows. We don't run this in windows and we don't use a custom allocator. + # RUSTSEC-2024-0399 according to the description, this is not affecting us since we are not using Acceptor run: | - cargo audit --ignore RUSTSEC-2022-0093 --ignore RUSTSEC-2024-0344 --ignore RUSTSEC-2022-0054 --ignore RUSTSEC-2021-0145 + cargo audit --ignore RUSTSEC-2022-0093 --ignore RUSTSEC-2024-0344 --ignore RUSTSEC-2022-0054 --ignore RUSTSEC-2021-0145 --ignore RUSTSEC-2024-0399 diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 83131e2df..1cd0d295c 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -1,6 +1,7 @@ use crate::config::{Config, LocalConfig, NetworkConfig, OverrideConfig}; use crate::gcp::GcpService; use crate::protocol::{MpcSignProtocol, SignQueue}; +use crate::storage::app_data_storage; use crate::{http_client, indexer, mesh, storage, web}; use clap::Parser; use deadpool_redis::Runtime; @@ -195,14 +196,6 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { .build()?; let gcp_service = rt.block_on(async { GcpService::init(&account_id, &storage_options).await })?; - let (indexer_handle, indexer) = indexer::run( - &indexer_options, - &mpc_contract_id, - &account_id, - &sign_queue, - &gcp_service, - &rt, - )?; let key_storage = storage::secret_storage::init(Some(&gcp_service), &storage_options, &account_id); @@ -214,6 +207,23 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { let triple_storage = storage::triple_storage::init(&redis_pool, &account_id); let presignature_storage = storage::presignature_storage::init(&redis_pool, &account_id); + let app_data_storage = app_data_storage::init(&redis_pool, &account_id); + + let mut rpc_client = near_fetch::Client::new(&near_rpc); + if let Some(referer_param) = client_header_referer { + let client_headers = rpc_client.inner_mut().headers_mut(); + client_headers.insert(http::header::REFERER, referer_param.parse().unwrap()); + } + tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized"); + + let (indexer_handle, indexer) = indexer::run( + &indexer_options, + &mpc_contract_id, + &account_id, + &sign_queue, + app_data_storage, + rpc_client.clone(), + )?; let sign_sk = sign_sk.unwrap_or_else(|| account_sk.clone()); let my_address = my_address @@ -229,13 +239,6 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { let (sender, receiver) = mpsc::channel(16384); tracing::info!(%my_address, "address detected"); - let mut rpc_client = near_fetch::Client::new(&near_rpc); - if let Some(referer_param) = client_header_referer { - let client_headers = rpc_client.inner_mut().headers_mut(); - client_headers.insert(http::header::REFERER, referer_param.parse().unwrap()); - } - - tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized"); let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk); let (protocol, protocol_state) = MpcSignProtocol::init( my_address, diff --git a/chain-signatures/node/src/gcp/error.rs b/chain-signatures/node/src/gcp/error.rs index 8818a07f3..42463611d 100644 --- a/chain-signatures/node/src/gcp/error.rs +++ b/chain-signatures/node/src/gcp/error.rs @@ -19,25 +19,3 @@ pub enum SecretStorageError { #[error("(de)serialization error: {0}")] SerdeError(#[from] serde_json::Error), } - -#[derive(thiserror::Error, Debug)] -pub enum DatastoreStorageError { - #[error("GCP error: {0}")] - GcpError(#[from] google_datastore1::Error), - #[error("IO error: {0}")] - IoError(#[from] std::io::Error), - #[error("(de)serialization error: {0}")] - SerdeError(#[from] serde_json::Error), - #[error("datastore value conversion error: {0}")] - ConvertError(ConvertError), - #[error("fetch_entities error: `{0}`")] - FetchEntitiesError(String), - #[error("could not find entity: {0}")] - EntityNotFound(String), -} - -impl From for DatastoreStorageError { - fn from(err: ConvertError) -> Self { - DatastoreStorageError::ConvertError(err) - } -} diff --git a/chain-signatures/node/src/gcp/mod.rs b/chain-signatures/node/src/gcp/mod.rs index 5293d9d57..0fa62b972 100644 --- a/chain-signatures/node/src/gcp/mod.rs +++ b/chain-signatures/node/src/gcp/mod.rs @@ -1,17 +1,9 @@ pub mod error; -pub mod value; -use self::value::{FromValue, IntoValue}; -use crate::gcp::error::DatastoreStorageError; use crate::storage; -use google_datastore1::api::Filter; -use google_datastore1::api::{ - CommitRequest, Entity, EntityResult, Key, KindExpression, LookupRequest, Mutation, PathElement, - Query, RunQueryRequest, -}; +use google_datastore1::api::Key; use google_datastore1::oauth2::AccessTokenAuthenticator; -use google_datastore1::Datastore; use google_secretmanager1::api::{AddSecretVersionRequest, SecretPayload}; use google_secretmanager1::oauth2::authenticator::ApplicationDefaultCredentialsTypes; use google_secretmanager1::oauth2::{ @@ -79,16 +71,6 @@ impl SecretManagerService { } } -#[derive(Clone)] -pub struct DatastoreService { - datastore: Datastore>, - project_id: String, - env: String, - is_emulator: bool, -} - -pub type DatastoreResult = std::result::Result; - pub trait Keyable: KeyKind { fn key(&self) -> Key; } @@ -97,268 +79,9 @@ pub trait KeyKind { fn kind() -> String; } -impl DatastoreService { - pub fn is_emulator(&self) -> bool { - self.is_emulator - } - - #[tracing::instrument(level = "debug", skip_all, fields(key = name_key.to_string()))] - pub async fn get( - &self, - name_key: K, - ) -> DatastoreResult { - let request = LookupRequest { - keys: Some(vec![Key { - path: Some(vec![PathElement { - // We can't create multiple datastore databases in GCP, so we have to suffix - // type kinds with env (`dev`, `prod`). - kind: Some(format!("{}-{}", T::kind(), self.env)), - name: Some(name_key.to_string()), - id: None, - }]), - partition_id: None, - }]), - read_options: None, - database_id: Some("".to_string()), - }; - let (_, response) = self - .datastore - .projects() - .lookup(request, &self.project_id) - .doit() - .await - .map_err(|e| { - tracing::error!(%e, "failed to lookup entity in data store"); - e - })?; - match response - .found - .and_then(|mut results| results.pop()) - .and_then(|result| result.entity) - { - Some(found_entity) => Ok(T::from_value(found_entity.into_value())?), - None => Err(DatastoreStorageError::EntityNotFound(name_key.to_string())), - } - } - - #[tracing::instrument(level = "debug", skip_all)] - pub async fn insert(&self, value: T) -> DatastoreResult<()> { - let mut entity = Entity::from_value(value.into_value())?; - let path_element = entity - .key - .as_mut() - .and_then(|k| k.path.as_mut()) - .and_then(|p| p.first_mut()); - if let Some(path_element) = path_element { - // We can't create multiple datastore databases in GCP, so we have to suffix - // type kinds with env (`dev`, `prod`). - path_element.kind = Some(format!("{}-{}", T::kind(), self.env)) - } - - let request = CommitRequest { - database_id: Some("".to_string()), - mode: Some(String::from("NON_TRANSACTIONAL")), - mutations: Some(vec![Mutation { - insert: Some(entity), - delete: None, - update: None, - base_version: None, - upsert: None, - update_time: None, - }]), - single_use_transaction: None, - transaction: None, - }; - let (_, _) = self - .datastore - .projects() - .commit(request, &self.project_id) - .doit() - .await - .map_err(|e| { - tracing::error!(%e, "failed to insert entity to data store"); - e - })?; - Ok(()) - } - - #[tracing::instrument(level = "debug", skip_all)] - pub async fn update(&self, value: T) -> DatastoreResult<()> { - let mut entity = Entity::from_value(value.into_value())?; - let path_element = entity - .key - .as_mut() - .and_then(|k| k.path.as_mut()) - .and_then(|p| p.first_mut()); - if let Some(path_element) = path_element { - // We can't create multiple datastore databases in GCP, so we have to suffix - // type kinds with env (`dev`, `prod`). - path_element.kind = Some(format!("{}-{}", T::kind(), self.env)) - } - - let request = CommitRequest { - database_id: Some("".to_string()), - mode: Some(String::from("NON_TRANSACTIONAL")), - mutations: Some(vec![Mutation { - insert: None, - delete: None, - update: Some(entity), - base_version: None, - upsert: None, - update_time: None, - }]), - single_use_transaction: None, - transaction: None, - }; - let (_, _) = self - .datastore - .projects() - .commit(request, &self.project_id) - .doit() - .await - .map_err(|e| { - tracing::error!(%e, "failed to update entity in data store"); - e - })?; - - Ok(()) - } - - pub async fn upsert(&self, value: T) -> DatastoreResult<()> { - let mut entity = Entity::from_value(value.into_value())?; - let path_element = entity - .key - .as_mut() - .and_then(|k| k.path.as_mut()) - .and_then(|p| p.first_mut()); - if let Some(path_element) = path_element { - // We can't create multiple datastore databases in GCP, so we have to suffix - // type kinds with env (`dev`, `prod`). - path_element.kind = Some(format!("{}-{}", T::kind(), self.env)) - } - - let request = CommitRequest { - database_id: Some("".to_string()), - mode: Some(String::from("NON_TRANSACTIONAL")), - mutations: Some(vec![Mutation { - insert: None, - delete: None, - update: None, - base_version: None, - upsert: Some(entity), - update_time: None, - }]), - single_use_transaction: None, - transaction: None, - }; - - let (_, _) = self - .datastore - .projects() - .commit(request, &self.project_id) - .doit() - .await - .map_err(|e| { - tracing::error!(%e, "failed to upsert entity in data store"); - e - })?; - - Ok(()) - } - - pub async fn fetch_entities( - &self, - filter: Option, - ) -> DatastoreResult> { - let kind: String = format!("{}-{}", T::kind(), self.env); - let req = RunQueryRequest { - database_id: Some("".to_string()), - partition_id: Default::default(), - read_options: Default::default(), - query: Some(Query { - projection: None, - kind: Some(vec![KindExpression { name: Some(kind) }]), - filter, - order: None, - distinct_on: Some(vec![]), - start_cursor: None, - end_cursor: None, - offset: None, - limit: None, - }), - gql_query: None, - }; - let (_hyper_resp, query_resp) = self - .datastore - .projects() - .run_query(req, &self.project_id) - .doit() - .await - .map_err(|e| { - tracing::error!(%e, "failed to fetch entities from data store"); - e - })?; - let batch = query_resp.batch.ok_or_else(|| { - DatastoreStorageError::FetchEntitiesError( - "Could not retrieve batch while fetching entities".to_string(), - ) - })?; - - // NOTE: if entity_results is None, we return an empty Vec since the fetch query - // could not find any entities in the DB. - Ok(batch.entity_results.unwrap_or_default()) - } - - #[tracing::instrument(level = "debug", skip_all)] - pub async fn delete(&self, keyable: T) -> DatastoreResult<()> { - self.delete_many(&[keyable]).await - } - - #[tracing::instrument(level = "debug", skip_all)] - pub async fn delete_many(&self, keyables: &[T]) -> DatastoreResult<()> { - let mutations = keyables - .iter() - .map(|keyable| { - let mut key = keyable.key(); - if let Some(path) = key.path.as_mut().and_then(|p| p.first_mut()) { - path.kind = Some(format!("{}-{}", T::kind(), self.env)); - } - Mutation { - insert: None, - delete: Some(key), - update: None, - base_version: None, - upsert: None, - update_time: None, - } - }) - .collect::>(); - - let request = CommitRequest { - database_id: Some("".to_string()), - mode: Some(String::from("NON_TRANSACTIONAL")), - mutations: Some(mutations), - single_use_transaction: None, - transaction: None, - }; - let (_, _) = self - .datastore - .projects() - .commit(request, &self.project_id) - .doit() - .await - .map_err(|e| { - tracing::error!(%e, "failed to delete entities in data store"); - e - })?; - Ok(()) - } -} - #[derive(Clone)] pub struct GcpService { pub project_id: String, - pub datastore: DatastoreService, pub secret_manager: SecretManagerService, pub account_id: AccountId, } @@ -370,7 +93,7 @@ impl GcpService { ) -> anyhow::Result { let project_id = storage_options.gcp_project_id.clone(); let secret_manager; - let datastore = if let Some(gcp_datastore_url) = storage_options.gcp_datastore_url.clone() { + if storage_options.env == "local-test" { let client = hyper::Client::builder().build( hyper_rustls::HttpsConnectorBuilder::new() .with_native_roots() @@ -379,15 +102,11 @@ impl GcpService { .enable_http2() .build(), ); - // Assuming custom GCP URL points to an emulator, so the token does not matter + // Assuming we are in a test environment, token does not matter let authenticator = AccessTokenAuthenticator::builder("TOKEN".to_string()) .build() .await?; secret_manager = SecretManager::new(client.clone(), authenticator.clone()); - let mut datastore = Datastore::new(client, authenticator); - datastore.base_url(gcp_datastore_url.clone()); - datastore.root_url(gcp_datastore_url); - datastore } else { // restring client to use https in production let client = hyper::Client::builder().build( @@ -406,17 +125,10 @@ impl GcpService { ApplicationDefaultCredentialsTypes::ServiceAccount(auth) => auth.build().await?, }; secret_manager = SecretManager::new(client.clone(), authenticator.clone()); - Datastore::new(client, authenticator) - }; + } Ok(Self { account_id: account_id.clone(), - datastore: DatastoreService { - datastore, - project_id: project_id.clone(), - env: storage_options.env.clone(), - is_emulator: storage_options.gcp_datastore_url.is_some(), - }, secret_manager: SecretManagerService { secret_manager, project_id: project_id.clone(), diff --git a/chain-signatures/node/src/gcp/value.rs b/chain-signatures/node/src/gcp/value.rs deleted file mode 100644 index e4346ded3..000000000 --- a/chain-signatures/node/src/gcp/value.rs +++ /dev/null @@ -1,383 +0,0 @@ -use google_datastore1::api::{ArrayValue, Entity, Key, LatLng}; -use std::collections::HashMap; - -use super::error::ConvertError; - -#[derive(Debug, Clone)] -pub enum Value { - BooleanValue(bool), - IntegerValue(i64), - DoubleValue(f64), - KeyValue(Key), - StringValue(String), - BlobValue(Vec), - GeoPointValue(f64, f64), - EntityValue { - key: Key, - properties: HashMap, - }, - ArrayValue(Vec), -} - -impl Value { - pub fn type_name(&self) -> &'static str { - match self { - Value::BooleanValue(_) => "bool", - Value::IntegerValue(_) => "integer", - Value::DoubleValue(_) => "double", - Value::KeyValue(_) => "key", - Value::StringValue(_) => "string", - Value::BlobValue(_) => "blob", - Value::GeoPointValue(_, _) => "geopoint", - Value::EntityValue { .. } => "entity", - Value::ArrayValue(_) => "array", - } - } -} - -pub trait IntoValue { - fn into_value(self) -> Value; -} - -pub trait FromValue: Sized { - fn from_value(value: Value) -> Result; -} - -/* - * IntoValue implementations - */ - -impl IntoValue for Value { - fn into_value(self) -> Value { - self - } -} - -impl IntoValue for String { - fn into_value(self) -> Value { - Value::StringValue(self) - } -} - -impl IntoValue for &str { - fn into_value(self) -> Value { - String::from(self).into_value() - } -} - -impl IntoValue for i8 { - fn into_value(self) -> Value { - Value::IntegerValue(self as i64) - } -} - -impl IntoValue for i16 { - fn into_value(self) -> Value { - Value::IntegerValue(self as i64) - } -} - -impl IntoValue for i32 { - fn into_value(self) -> Value { - Value::IntegerValue(self as i64) - } -} - -impl IntoValue for i64 { - fn into_value(self) -> Value { - Value::IntegerValue(self) - } -} - -impl IntoValue for f32 { - fn into_value(self) -> Value { - Value::DoubleValue(self as f64) - } -} - -impl IntoValue for f64 { - fn into_value(self) -> Value { - Value::DoubleValue(self) - } -} - -impl IntoValue for bool { - fn into_value(self) -> Value { - Value::BooleanValue(self) - } -} - -impl IntoValue for Key { - fn into_value(self) -> Value { - Value::KeyValue(self) - } -} - -impl IntoValue for Vec { - fn into_value(self) -> Value { - Value::BlobValue(self.to_vec()) - } -} - -impl IntoValue for Vec -where - T: IntoValue, -{ - fn into_value(self) -> Value { - Value::ArrayValue(self.into_iter().map(IntoValue::into_value).collect()) - } -} - -impl From for Value { - fn from(value: google_datastore1::api::Value) -> Value { - if let Some(val) = value.boolean_value { - Value::BooleanValue(val) - } else if let Some(val) = value.integer_value { - Value::IntegerValue(val) - } else if let Some(val) = value.double_value { - Value::DoubleValue(val) - } else if let Some(val) = value.key_value { - Value::KeyValue(val) - } else if let Some(val) = value.string_value { - Value::StringValue(val) - } else if let Some(val) = value.blob_value { - Value::BlobValue(val) - } else if let Some(val) = value.geo_point_value { - Value::GeoPointValue( - val.latitude.unwrap_or_default(), - val.longitude.unwrap_or_default(), - ) - } else if let Some(val) = value.entity_value { - Value::EntityValue { - key: val.key.unwrap_or_default(), - properties: val - .properties - .unwrap_or_default() - .into_iter() - .map(|(k, v)| (k, Value::from(v))) - .collect(), - } - } else if let Some(val) = value.array_value { - Value::ArrayValue( - val.values - .unwrap_or_default() - .into_iter() - .map(Value::from) - .collect(), - ) - } else { - unimplemented!() - } - } -} - -impl IntoValue for google_datastore1::api::Value { - fn into_value(self) -> Value { - self.into() - } -} - -impl IntoValue for google_datastore1::api::Entity { - fn into_value(self) -> Value { - Value::EntityValue { - key: self.key.unwrap_or_default(), - properties: self - .properties - .unwrap_or_default() - .into_iter() - .map(|(k, v)| (k, v.into_value())) - .collect(), - } - } -} - -/* - * FromValue implementations - */ - -impl FromValue for Value { - fn from_value(value: Value) -> Result { - Ok(value) - } -} - -impl FromValue for String { - fn from_value(value: Value) -> Result { - match value { - Value::StringValue(value) => Ok(value), - _ => Err(ConvertError::UnexpectedPropertyType { - expected: String::from("string"), - got: String::from(value.type_name()), - }), - } - } -} - -impl FromValue for i64 { - fn from_value(value: Value) -> Result { - match value { - Value::IntegerValue(value) => Ok(value), - _ => Err(ConvertError::UnexpectedPropertyType { - expected: String::from("integer"), - got: String::from(value.type_name()), - }), - } - } -} - -impl FromValue for f64 { - fn from_value(value: Value) -> Result { - match value { - Value::DoubleValue(value) => Ok(value), - _ => Err(ConvertError::UnexpectedPropertyType { - expected: String::from("double"), - got: String::from(value.type_name()), - }), - } - } -} - -impl FromValue for bool { - fn from_value(value: Value) -> Result { - match value { - Value::BooleanValue(value) => Ok(value), - _ => Err(ConvertError::UnexpectedPropertyType { - expected: String::from("bool"), - got: String::from(value.type_name()), - }), - } - } -} - -impl FromValue for Key { - fn from_value(value: Value) -> Result { - match value { - Value::KeyValue(value) => Ok(value), - _ => Err(ConvertError::UnexpectedPropertyType { - expected: String::from("key"), - got: String::from(value.type_name()), - }), - } - } -} - -impl FromValue for Vec { - fn from_value(value: Value) -> Result, ConvertError> { - match value { - Value::BlobValue(value) => Ok(value), - _ => Err(ConvertError::UnexpectedPropertyType { - expected: String::from("blob"), - got: String::from(value.type_name()), - }), - } - } -} - -impl FromValue for Vec -where - T: FromValue, -{ - fn from_value(value: Value) -> Result, ConvertError> { - match value { - Value::ArrayValue(values) => { - let values = values - .into_iter() - .map(FromValue::from_value) - .collect::, ConvertError>>()?; - Ok(values) - } - _ => Err(ConvertError::UnexpectedPropertyType { - expected: String::from("array"), - got: String::from(value.type_name()), - }), - } - } -} - -impl FromValue for Entity { - fn from_value(value: Value) -> Result { - match value { - Value::EntityValue { key, properties } => { - let properties = properties - .into_iter() - .map(|(k, v)| { - let v = FromValue::from_value(v)?; - Ok((k, v)) - }) - .collect::, ConvertError>>()?; - Ok(Entity { - key: Some(key), - properties: Some(properties), - }) - } - _ => Err(ConvertError::UnexpectedPropertyType { - expected: String::from("entity"), - got: String::from(value.type_name()), - }), - } - } -} - -impl FromValue for google_datastore1::api::Value { - fn from_value(value: Value) -> Result { - let result = match value { - Value::BooleanValue(val) => google_datastore1::api::Value { - boolean_value: Some(val), - ..Default::default() - }, - Value::IntegerValue(val) => google_datastore1::api::Value { - integer_value: Some(val), - ..Default::default() - }, - Value::DoubleValue(val) => google_datastore1::api::Value { - double_value: Some(val), - ..Default::default() - }, - Value::KeyValue(val) => google_datastore1::api::Value { - key_value: Some(val), - ..Default::default() - }, - Value::StringValue(val) => google_datastore1::api::Value { - string_value: Some(val), - ..Default::default() - }, - Value::BlobValue(val) => google_datastore1::api::Value { - blob_value: Some(val), - ..Default::default() - }, - Value::GeoPointValue(latitude, longitude) => google_datastore1::api::Value { - geo_point_value: Some(LatLng { - latitude: Some(latitude), - longitude: Some(longitude), - }), - ..Default::default() - }, - Value::EntityValue { key, properties } => { - let properties = properties - .into_iter() - .map(|(k, v)| FromValue::from_value(v).map(|v| (k, v))) - .collect::, ConvertError>>()?; - google_datastore1::api::Value { - entity_value: Some(Entity { - key: Some(key), - properties: Some(properties), - }), - ..Default::default() - } - } - Value::ArrayValue(val) => { - let values = val - .into_iter() - .map(FromValue::from_value) - .collect::, ConvertError>>()?; - google_datastore1::api::Value { - array_value: Some(ArrayValue { - values: Some(values), - }), - ..Default::default() - } - } - }; - Ok(result) - } -} diff --git a/chain-signatures/node/src/http_client.rs b/chain-signatures/node/src/http_client.rs index 4620a805f..807305239 100644 --- a/chain-signatures/node/src/http_client.rs +++ b/chain-signatures/node/src/http_client.rs @@ -203,7 +203,7 @@ impl MessageQueue { } if uncompacted > 0 { - tracing::info!( + tracing::debug!( uncompacted, compacted, "{from:?} sent messages in {:?};", diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index a744b69d5..bd15a3e7d 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -1,7 +1,5 @@ -use crate::gcp::error::DatastoreStorageError; -use crate::gcp::GcpService; use crate::protocol::{SignQueue, SignRequest}; -use crate::types::LatestBlockHeight; +use crate::storage::app_data_storage::AppDataStorage; use crypto_shared::{derive_epsilon, ScalarExt}; use k256::Scalar; use near_account_id::AccountId; @@ -37,15 +35,6 @@ pub struct Options { #[clap(long, env("MPC_INDEXER_S3_URL"))] pub s3_url: Option, - /// The block height to start indexing from. - // Defaults to the latest block on 2023-11-14 07:40:22 AM UTC - #[clap( - long, - env("MPC_INDEXER_START_BLOCK_HEIGHT"), - default_value = "145964826" - )] - pub start_block_height: u64, - /// The amount of time before we should that our indexer is behind. #[clap(long, env("MPC_INDEXER_BEHIND_THRESHOLD"), default_value = "200")] pub behind_threshold: u64, @@ -62,8 +51,6 @@ impl Options { self.s3_bucket, "--s3-region".to_string(), self.s3_region, - "--start-block-height".to_string(), - self.start_block_height.to_string(), "--behind-threshold".to_string(), self.behind_threshold.to_string(), "--running-threshold".to_string(), @@ -99,9 +86,9 @@ pub struct ContractSignRequest { pub key_version: u32, } -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct Indexer { - latest_block_height: Arc>, + app_data_storage: AppDataStorage, last_updated_timestamp: Arc>, latest_block_timestamp_nanosec: Arc>>, running_threshold: Duration, @@ -109,13 +96,9 @@ pub struct Indexer { } impl Indexer { - fn new(latest_block_height: LatestBlockHeight, options: &Options) -> Self { - tracing::info!( - "creating new indexer, latest block height: {}", - latest_block_height.block_height - ); + fn new(app_data_storage: AppDataStorage, options: &Options) -> Self { Self { - latest_block_height: Arc::new(RwLock::new(latest_block_height)), + app_data_storage: app_data_storage.clone(), last_updated_timestamp: Arc::new(RwLock::new(Instant::now())), latest_block_timestamp_nanosec: Arc::new(RwLock::new(None)), running_threshold: Duration::from_secs(options.running_threshold), @@ -123,9 +106,28 @@ impl Indexer { } } - /// Get the latest block height from the chain. - pub async fn latest_block_height(&self) -> BlockHeight { - self.latest_block_height.read().await.block_height + pub async fn last_processed_block(&self) -> Option { + match self.app_data_storage.last_processed_block().await { + Ok(Some(block_height)) => Some(block_height), + Ok(None) => { + tracing::warn!("no last processed block found"); + None + } + Err(err) => { + tracing::warn!(%err, "failed to get last processed block"); + None + } + } + } + + pub async fn set_last_processed_block(&self, block_height: BlockHeight) { + if let Err(err) = self + .app_data_storage + .set_last_processed_block(block_height) + .await + { + tracing::error!(%err, "failed to set last processed block"); + } } /// Check whether the indexer is on track with the latest block height from the chain. @@ -155,17 +157,11 @@ impl Indexer { &self, block_height: BlockHeight, block_timestamp_nanosec: u64, - gcp: &GcpService, - ) -> Result<(), DatastoreStorageError> { + ) { tracing::debug!(block_height, "update_block_height_and_timestamp"); + self.set_last_processed_block(block_height).await; *self.last_updated_timestamp.write().await = Instant::now(); *self.latest_block_timestamp_nanosec.write().await = Some(block_timestamp_nanosec); - self.latest_block_height - .write() - .await - .set(block_height) - .store(gcp) - .await } } @@ -173,7 +169,6 @@ impl Indexer { struct Context { mpc_contract_id: AccountId, node_account_id: AccountId, - gcp_service: GcpService, queue: Arc>, indexer: Indexer, } @@ -263,15 +258,11 @@ async fn handle_block( } ctx.indexer - .update_block_height_and_timestamp( - block.block_height(), - block.header().timestamp_nanosec(), - &ctx.gcp_service, - ) - .await?; + .update_block_height_and_timestamp(block.block_height(), block.header().timestamp_nanosec()) + .await; crate::metrics::LATEST_BLOCK_HEIGHT - .with_label_values(&[ctx.gcp_service.account_id.as_str()]) + .with_label_values(&[ctx.node_account_id.as_str()]) .set(block.block_height() as i64); // Add the requests after going through the whole block to avoid partial processing if indexer fails somewhere. @@ -280,7 +271,7 @@ async fn handle_block( for request in pending_requests { queue.add(request); crate::metrics::NUM_SIGN_REQUESTS - .with_label_values(&[ctx.gcp_service.account_id.as_str()]) + .with_label_values(&[ctx.node_account_id.as_str()]) .inc(); } drop(queue); @@ -302,36 +293,21 @@ pub fn run( mpc_contract_id: &AccountId, node_account_id: &AccountId, queue: &Arc>, - gcp_service: &crate::gcp::GcpService, - rt: &tokio::runtime::Runtime, + app_data_storage: AppDataStorage, + rpc_client: near_fetch::Client, ) -> anyhow::Result<(JoinHandle>, Indexer)> { tracing::info!( s3_bucket = options.s3_bucket, s3_region = options.s3_region, s3_url = options.s3_url, - start_block_height = options.start_block_height, %mpc_contract_id, "starting indexer" ); - let latest_block_height = rt.block_on(async { - match LatestBlockHeight::fetch(gcp_service).await { - Ok(latest) => latest, - Err(err) => { - tracing::warn!(%err, "failed to fetch latest block height; using start_block_height={} instead", options.start_block_height); - LatestBlockHeight { - account_id: node_account_id.clone(), - block_height: options.start_block_height, - } - } - } - }); - - let indexer = Indexer::new(latest_block_height, options); + let indexer = Indexer::new(app_data_storage.clone(), options); let context = Context { mpc_contract_id: mpc_contract_id.clone(), node_account_id: node_account_id.clone(), - gcp_service: gcp_service.clone(), queue: queue.clone(), indexer: indexer.clone(), }; @@ -351,24 +327,30 @@ pub fn run( i += 1; let Ok(lake) = rt.block_on(async { - let latest = context.indexer.latest_block_height().await; - if i > 0 { - tracing::warn!("indexer latest height {latest}, restart count={i}"); - } - let mut lake_builder = LakeBuilder::default() - .s3_bucket_name(&options.s3_bucket) - .s3_region_name(&options.s3_region) - .start_block_height(latest); - - if let Some(s3_url) = &options.s3_url { - let aws_config = aws_config::from_env().load().await; - let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) - .endpoint_url(s3_url) - .build(); - lake_builder = lake_builder.s3_config(s3_config); + update_last_processed_block(rpc_client.clone(), app_data_storage.clone()).await?; + + if let Some(latest) = context.indexer.last_processed_block().await { + if i > 0 { + tracing::warn!("indexer latest height {latest}, restart count={i}"); + } + let mut lake_builder = LakeBuilder::default() + .s3_bucket_name(&options.s3_bucket) + .s3_region_name(&options.s3_region) + .start_block_height(latest); + + if let Some(s3_url) = &options.s3_url { + let aws_config = aws_config::from_env().load().await; + let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) + .endpoint_url(s3_url) + .build(); + lake_builder = lake_builder.s3_config(s3_config); + } + let lake = lake_builder.build()?; + anyhow::Ok(lake) + } else { + tracing::warn!("indexer failed to get last processed block"); + Err(anyhow::anyhow!("failed to get last processed block")) } - let lake = lake_builder.build()?; - anyhow::Ok(lake) }) else { tracing::error!(?options, "indexer failed to build"); backoff(i, 1, 120); @@ -427,6 +409,52 @@ pub fn run( Ok((join_handle, indexer)) } +/// This function ensures we do not go back in time a lot when restarting the node +async fn update_last_processed_block( + rpc_client: near_fetch::Client, + app_data_storage: AppDataStorage, +) -> anyhow::Result<()> { + let last_processed_block = match app_data_storage.last_processed_block().await { + Ok(Some(block_height)) => block_height, + Ok(None) => 0, + Err(err) => { + tracing::warn!(%err, "failed to get last processed block"); + return Err(err); + } + }; + + let latest_block: u64 = rpc_client.view_block().await?.header.height; + + if last_processed_block > latest_block { + let error_message = format!( + "last processed block is greater than latest block: last_processed_block={}, latest_block={}", + last_processed_block, latest_block + ); + tracing::error!("{}", error_message); + Err(anyhow::anyhow!(error_message))?; + } + + const MAX_YIELD_RESUME_BLOCKS: u64 = 200; + let starting_block: u64 = { + if latest_block - last_processed_block < MAX_YIELD_RESUME_BLOCKS { + last_processed_block + } else { + latest_block.saturating_sub(MAX_YIELD_RESUME_BLOCKS) + } + }; + app_data_storage + .set_last_processed_block(starting_block) + .await?; + + tracing::info!( + "set last processed block to {} to start indexer with, previous last processed: {}, latest block: {}", + starting_block, + last_processed_block, + latest_block, + ); + Ok(()) +} + fn backoff(i: u32, multiplier: u32, max: u64) { // Exponential backoff with max delay of max seconds let delay: u64 = std::cmp::min(2u64.pow(i).mul(multiplier as u64), max); diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index d50fd0317..f1a1a0064 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -4,7 +4,6 @@ use super::state::{ WaitingForConsensusState, }; use super::{Config, SignQueue}; -use crate::gcp::error::DatastoreStorageError; use crate::gcp::error::SecretStorageError; use crate::http_client::MessageQueue; use crate::protocol::contract::primitives::Participants; @@ -12,9 +11,9 @@ use crate::protocol::presignature::PresignatureManager; use crate::protocol::signature::SignatureManager; use crate::protocol::state::{GeneratingState, ResharingState}; use crate::protocol::triple::TripleManager; -use crate::storage::presignature_storage::PresignatureRedisStorage; +use crate::storage::presignature_storage::PresignatureStorage; use crate::storage::secret_storage::SecretNodeStorageBox; -use crate::storage::triple_storage::TripleRedisStorage; +use crate::storage::triple_storage::TripleStorage; use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare}; use crate::util::AffinePointExt; use crate::{http_client, rpc_client}; @@ -40,8 +39,8 @@ pub trait ConsensusCtx { fn my_address(&self) -> &Url; fn sign_queue(&self) -> Arc>; fn secret_storage(&self) -> &SecretNodeStorageBox; - fn triple_storage(&self) -> &TripleRedisStorage; - fn presignature_storage(&self) -> &PresignatureRedisStorage; + fn triple_storage(&self) -> &TripleStorage; + fn presignature_storage(&self) -> &PresignatureStorage; fn cfg(&self) -> &Config; fn message_options(&self) -> http_client::Options; } @@ -68,8 +67,6 @@ pub enum ConsensusError { CaitSithInitializationError(#[from] InitializationError), #[error("secret storage error: {0}")] SecretStorageError(SecretStorageError), - #[error("datastore storage error: {0}")] - DatastoreStorageError(DatastoreStorageError), } impl From for ConsensusError { @@ -78,12 +75,6 @@ impl From for ConsensusError { } } -impl From for ConsensusError { - fn from(err: DatastoreStorageError) -> Self { - ConsensusError::DatastoreStorageError(err) - } -} - #[async_trait] pub trait ConsensusProtocol { async fn advance( diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index 7109e7c4c..c7ea2cd8c 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -29,9 +29,9 @@ use crate::protocol::consensus::ConsensusProtocol; use crate::protocol::cryptography::CryptographicProtocol; use crate::protocol::message::{MessageHandler, MpcMessageQueue}; use crate::rpc_client; -use crate::storage::presignature_storage::PresignatureRedisStorage; +use crate::storage::presignature_storage::PresignatureStorage; use crate::storage::secret_storage::SecretNodeStorageBox; -use crate::storage::triple_storage::TripleRedisStorage; +use crate::storage::triple_storage::TripleStorage; use cait_sith::protocol::Participant; use near_account_id::AccountId; @@ -53,8 +53,8 @@ struct Ctx { http_client: reqwest::Client, sign_queue: Arc>, secret_storage: SecretNodeStorageBox, - triple_storage: TripleRedisStorage, - presignature_storage: PresignatureRedisStorage, + triple_storage: TripleStorage, + presignature_storage: PresignatureStorage, cfg: Config, mesh: Mesh, message_options: http_client::Options, @@ -97,11 +97,11 @@ impl ConsensusCtx for &mut MpcSignProtocol { &self.ctx.cfg } - fn triple_storage(&self) -> &TripleRedisStorage { + fn triple_storage(&self) -> &TripleStorage { &self.ctx.triple_storage } - fn presignature_storage(&self) -> &PresignatureRedisStorage { + fn presignature_storage(&self) -> &PresignatureStorage { &self.ctx.presignature_storage } @@ -177,8 +177,8 @@ impl MpcSignProtocol { receiver: mpsc::Receiver, sign_queue: Arc>, secret_storage: SecretNodeStorageBox, - triple_storage: TripleRedisStorage, - presignature_storage: PresignatureRedisStorage, + triple_storage: TripleStorage, + presignature_storage: PresignatureStorage, cfg: Config, mesh_options: mesh::Options, message_options: http_client::Options, diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 6aecae80b..865df5197 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -1,7 +1,7 @@ use super::message::PresignatureMessage; use super::triple::{Triple, TripleId, TripleManager}; use crate::protocol::contract::primitives::Participants; -use crate::storage::presignature_storage::PresignatureRedisStorage; +use crate::storage::presignature_storage::PresignatureStorage; use crate::types::{PresignatureProtocol, SecretKeyShare}; use crate::util::AffinePointExt; @@ -150,7 +150,7 @@ pub enum GenerationError { /// Abstracts how triples are generated by providing a way to request a new triple that will be /// complete some time in the future and a way to take an already generated triple. pub struct PresignatureManager { - presignature_storage: PresignatureRedisStorage, + presignature_storage: PresignatureStorage, /// Ongoing presignature generation protocols. generators: HashMap, /// The set of presignatures that were introduced to the system by the current node. @@ -171,7 +171,7 @@ impl PresignatureManager { threshold: usize, epoch: u64, my_account_id: &AccountId, - storage: &PresignatureRedisStorage, + storage: &PresignatureStorage, ) -> Self { Self { presignature_storage: storage.clone(), diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index cb9ef8baa..2e80d3386 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -2,7 +2,7 @@ use super::contract::primitives::Participants; use super::cryptography::CryptographicError; use super::message::TripleMessage; use super::presignature::GenerationError; -use crate::storage::triple_storage::TripleRedisStorage; +use crate::storage::triple_storage::TripleStorage; use crate::types::TripleProtocol; use crate::util::AffinePointExt; @@ -80,7 +80,7 @@ impl TripleGenerator { /// complete some time in the future and a way to take an already generated triple. pub struct TripleManager { /// Triple Storage - pub triple_storage: TripleRedisStorage, + pub triple_storage: TripleStorage, /// The pool of triple protocols that have yet to be completed. pub generators: HashMap, @@ -128,7 +128,7 @@ impl TripleManager { threshold: usize, epoch: u64, my_account_id: &AccountId, - storage: &TripleRedisStorage, + storage: &TripleStorage, ) -> Self { Self { generators: HashMap::new(), diff --git a/chain-signatures/node/src/storage/app_data_storage.rs b/chain-signatures/node/src/storage/app_data_storage.rs new file mode 100644 index 000000000..30041ef6c --- /dev/null +++ b/chain-signatures/node/src/storage/app_data_storage.rs @@ -0,0 +1,43 @@ +use anyhow::Ok; +use deadpool_redis::Pool; +use near_primitives::types::BlockHeight; +use near_sdk::AccountId; +use redis::AsyncCommands; + +const APP_DATA_PREFIX: &str = "app_data"; +const APP_DATA_STORAGE_VERSION: &str = "v1"; + +pub fn init(pool: &Pool, node_account_id: &AccountId) -> AppDataStorage { + AppDataStorage { + redis_pool: pool.clone(), + node_account_id: node_account_id.clone(), + } +} + +#[derive(Clone)] +pub struct AppDataStorage { + redis_pool: Pool, + node_account_id: AccountId, +} + +impl AppDataStorage { + pub async fn set_last_processed_block(&self, height: BlockHeight) -> anyhow::Result<()> { + let mut conn = self.redis_pool.get().await?; + conn.set::<&str, BlockHeight, ()>(&self.last_block_key(), height) + .await?; + Ok(()) + } + + pub async fn last_processed_block(&self) -> anyhow::Result> { + let mut conn = self.redis_pool.get().await?; + let result: Option = conn.get(self.last_block_key()).await?; + Ok(result) + } + + fn last_block_key(&self) -> String { + format!( + "{}:{}:{}:last_block", + APP_DATA_PREFIX, APP_DATA_STORAGE_VERSION, self.node_account_id + ) + } +} diff --git a/chain-signatures/node/src/storage/mod.rs b/chain-signatures/node/src/storage/mod.rs index 550fe2378..55231df93 100644 --- a/chain-signatures/node/src/storage/mod.rs +++ b/chain-signatures/node/src/storage/mod.rs @@ -1,3 +1,4 @@ +pub mod app_data_storage; pub mod presignature_storage; pub mod secret_storage; pub mod triple_storage; @@ -6,7 +7,7 @@ pub mod triple_storage; #[derive(Debug, Clone, clap::Parser)] #[group(id = "storage_options")] pub struct Options { - /// env used to suffix datastore table names to differentiate among environments. + /// env used to differentiate among environments. #[clap(long, env("MPC_ENV"))] pub env: String, /// GCP project ID. @@ -16,9 +17,6 @@ pub struct Options { #[clap(long, env("MPC_SK_SHARE_SECRET_ID"), requires_all=["gcp_project_id"])] pub sk_share_secret_id: Option, /// Mostly for integration tests. - /// GCP Datastore URL that will be used to load/store the node's triples and presignatures. - #[arg(long, env("MPC_GCP_DATASTORE_URL"))] - pub gcp_datastore_url: Option, #[arg(long, env("MPC_SK_SHARE_LOCAL_PATH"))] pub sk_share_local_path: Option, #[arg(long, env("MPC_REDIS_URL"))] @@ -36,9 +34,6 @@ impl Options { if let Some(sk_share_secret_id) = self.sk_share_secret_id { opts.extend(vec!["--sk-share-secret-id".to_string(), sk_share_secret_id]); } - if let Some(gcp_datastore_url) = self.gcp_datastore_url { - opts.extend(vec!["--gcp-datastore-url".to_string(), gcp_datastore_url]); - } if let Some(sk_share_local_path) = self.sk_share_local_path { opts.extend(vec![ "--sk-share-local-path".to_string(), diff --git a/chain-signatures/node/src/storage/presignature_storage.rs b/chain-signatures/node/src/storage/presignature_storage.rs index 0f750185a..0138d27fe 100644 --- a/chain-signatures/node/src/storage/presignature_storage.rs +++ b/chain-signatures/node/src/storage/presignature_storage.rs @@ -10,20 +10,20 @@ type PresigResult = std::result::Result; // Can be used to "clear" redis storage in case of a breaking change const PRESIGNATURE_STORAGE_VERSION: &str = "v1"; -pub fn init(pool: &Pool, node_account_id: &AccountId) -> PresignatureRedisStorage { - PresignatureRedisStorage { +pub fn init(pool: &Pool, node_account_id: &AccountId) -> PresignatureStorage { + PresignatureStorage { redis_pool: pool.clone(), node_account_id: node_account_id.clone(), } } #[derive(Clone)] -pub struct PresignatureRedisStorage { +pub struct PresignatureStorage { redis_pool: Pool, node_account_id: AccountId, } -impl PresignatureRedisStorage { +impl PresignatureStorage { pub async fn insert(&self, presignature: Presignature) -> PresigResult<()> { let mut connection = self.redis_pool.get().await?; connection diff --git a/chain-signatures/node/src/storage/triple_storage.rs b/chain-signatures/node/src/storage/triple_storage.rs index 4a76c3365..ff5e39fe1 100644 --- a/chain-signatures/node/src/storage/triple_storage.rs +++ b/chain-signatures/node/src/storage/triple_storage.rs @@ -10,20 +10,20 @@ type TripleResult = std::result::Result; // Can be used to "clear" redis storage in case of a breaking change const TRIPLE_STORAGE_VERSION: &str = "v1"; -pub fn init(pool: &Pool, account_id: &AccountId) -> TripleRedisStorage { - TripleRedisStorage { +pub fn init(pool: &Pool, account_id: &AccountId) -> TripleStorage { + TripleStorage { redis_pool: pool.clone(), node_account_id: account_id.clone(), } } #[derive(Clone)] -pub struct TripleRedisStorage { +pub struct TripleStorage { redis_pool: Pool, node_account_id: AccountId, } -impl TripleRedisStorage { +impl TripleStorage { pub async fn insert(&self, triple: Triple) -> TripleResult<()> { let mut conn = self.redis_pool.get().await?; conn.hset::<&str, TripleId, Triple, ()>(&self.triple_key(), triple.id, triple) diff --git a/chain-signatures/node/src/types.rs b/chain-signatures/node/src/types.rs index 704bba090..f190c34ab 100644 --- a/chain-signatures/node/src/types.rs +++ b/chain-signatures/node/src/types.rs @@ -8,13 +8,8 @@ use crypto_shared::PublicKey; use k256::{elliptic_curve::CurveArithmetic, Secp256k1}; use tokio::sync::{RwLock, RwLockWriteGuard}; -use crate::gcp::error::ConvertError; -use crate::gcp::value::{FromValue, IntoValue, Value}; -use crate::gcp::{DatastoreResult, GcpService, KeyKind}; use crate::protocol::contract::ResharingContractState; -use near_account_id::AccountId; - pub type SecretKeyShare = ::Scalar; pub type TripleProtocol = Box> + Send + Sync>; @@ -133,106 +128,3 @@ impl ReshareProtocol { self.protocol.write().await } } - -#[derive(Clone, Debug)] -pub struct LatestBlockHeight { - pub account_id: AccountId, - pub block_height: near_primitives::types::BlockHeight, -} - -impl LatestBlockHeight { - pub async fn fetch(gcp: &GcpService) -> DatastoreResult { - gcp.datastore - .get(format!("{}/latest-block-height", gcp.account_id)) - .await - } - - pub fn set(&mut self, block_height: near_primitives::types::BlockHeight) -> &mut Self { - self.block_height = block_height; - self - } - - pub async fn store(&self, gcp: &GcpService) -> DatastoreResult<()> { - gcp.datastore.upsert(self).await - } -} - -impl IntoValue for LatestBlockHeight { - fn into_value(self) -> Value { - (&self).into_value() - } -} - -impl IntoValue for &LatestBlockHeight { - fn into_value(self) -> Value { - let properties = { - let mut properties = std::collections::HashMap::new(); - properties.insert( - "account_id".to_string(), - Value::StringValue(self.account_id.to_string()), - ); - properties.insert( - "block_height".to_string(), - Value::IntegerValue(self.block_height as i64), - ); - properties - }; - Value::EntityValue { - key: google_datastore1::api::Key { - path: Some(vec![google_datastore1::api::PathElement { - kind: Some(LatestBlockHeight::kind()), - name: Some(format!("{}/latest-block-height", self.account_id)), - id: None, - }]), - partition_id: None, - }, - properties, - } - } -} - -impl FromValue for LatestBlockHeight { - fn from_value(value: Value) -> Result { - match value { - Value::EntityValue { - key: _, - mut properties, - } => { - let account_id = properties - .remove("account_id") - .ok_or_else(|| ConvertError::MissingProperty("account_id".to_string()))?; - let account_id = String::from_value(account_id)?.parse().map_err(|err| { - ConvertError::MalformedProperty(format!( - "LatestBlockHeight failed to parse account_id: {err:?}", - )) - })?; - - let block_height = properties - .remove("block_height") - .ok_or_else(|| ConvertError::MissingProperty("block_height".to_string()))?; - let block_height = i64::from_value(block_height)? as u64; - - Ok(LatestBlockHeight { - account_id, - block_height, - }) - } - _ => Err(ConvertError::UnexpectedPropertyType { - expected: String::from("integer"), - got: String::from(value.type_name()), - }), - } - } -} - -impl KeyKind for LatestBlockHeight { - fn kind() -> String { - "LatestBlockHeight".to_string() - } -} - -impl KeyKind for &LatestBlockHeight { - fn kind() -> String { - "LatestBlockHeight".to_string() - } -} diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index a39a8d6e7..633267514 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -130,7 +130,8 @@ pub enum StateView { #[tracing::instrument(level = "debug", skip_all)] async fn state(Extension(state): Extension>) -> Result> { tracing::debug!("fetching state"); - let latest_block_height = state.indexer.latest_block_height().await; + // TODO: rename to last_processed_block when making other breaking changes + let latest_block_height = state.indexer.last_processed_block().await.unwrap_or(0); let is_stable = state.indexer.is_stable().await; let protocol_state = state.protocol_state.read().await; diff --git a/infra/modules/multichain/main.tf b/infra/modules/multichain/main.tf index 91b0dbc1f..08a98b34f 100644 --- a/infra/modules/multichain/main.tf +++ b/infra/modules/multichain/main.tf @@ -57,10 +57,6 @@ resource "google_cloud_run_v2_service" "node" { value = var.indexer_options.s3_url } } - env { - name = "MPC_INDEXER_START_BLOCK_HEIGHT" - value = var.indexer_options.start_block_height - } env { name = "MPC_ACCOUNT_SK" value_source { diff --git a/infra/multichain-dev/main.tf b/infra/multichain-dev/main.tf index 6ff42f1af..6a8d341f6 100644 --- a/infra/multichain-dev/main.tf +++ b/infra/multichain-dev/main.tf @@ -98,7 +98,6 @@ resource "google_service_account" "service_account" { resource "google_project_iam_member" "sa-roles" { for_each = toset([ - "roles/datastore.user", "roles/secretmanager.admin", "roles/storage.objectAdmin", "roles/iam.serviceAccountAdmin", diff --git a/infra/multichain-dev/variables.tf b/infra/multichain-dev/variables.tf index 2035e3243..dc0d70997 100644 --- a/infra/multichain-dev/variables.tf +++ b/infra/multichain-dev/variables.tf @@ -103,10 +103,6 @@ variable "static_env" { name = "MPC_INDEXER_S3_BUCKET" value = "near-lake-data-testnet" }, - { - name = "MPC_INDEXER_START_BLOCK_HEIGHT" - value = 180133172 - }, { name = "AWS_DEFAULT_REGION" value = "eu-central-1" diff --git a/infra/multichain-mainnet/main.tf b/infra/multichain-mainnet/main.tf index 3de0b4ee4..125f6e5e7 100644 --- a/infra/multichain-mainnet/main.tf +++ b/infra/multichain-mainnet/main.tf @@ -95,7 +95,6 @@ resource "google_service_account" "service_account" { resource "google_project_iam_member" "sa-roles" { for_each = toset([ - "roles/datastore.user", "roles/secretmanager.admin", "roles/storage.objectAdmin", "roles/iam.serviceAccountAdmin", diff --git a/infra/multichain-mainnet/variables.tf b/infra/multichain-mainnet/variables.tf index 98ba6504f..6b67fe16c 100644 --- a/infra/multichain-mainnet/variables.tf +++ b/infra/multichain-mainnet/variables.tf @@ -104,10 +104,6 @@ variable "static_env" { name = "MPC_INDEXER_S3_BUCKET" value = "near-lake-data-mainnet" }, - { - name = "MPC_INDEXER_START_BLOCK_HEIGHT" - value = 131590539 - }, { name = "AWS_DEFAULT_REGION" value = "eu-central-1" diff --git a/infra/multichain-testnet/main.tf b/infra/multichain-testnet/main.tf index 320bc3d6e..8af43bece 100644 --- a/infra/multichain-testnet/main.tf +++ b/infra/multichain-testnet/main.tf @@ -97,7 +97,6 @@ resource "google_service_account" "service_account" { resource "google_project_iam_member" "sa-roles" { for_each = toset([ - "roles/datastore.user", "roles/secretmanager.admin", "roles/storage.objectAdmin", "roles/iam.serviceAccountAdmin", diff --git a/infra/multichain-testnet/variables.tf b/infra/multichain-testnet/variables.tf index 1628a7d4a..1e8a9c853 100644 --- a/infra/multichain-testnet/variables.tf +++ b/infra/multichain-testnet/variables.tf @@ -103,10 +103,6 @@ variable "static_env" { name = "MPC_INDEXER_S3_BUCKET" value = "near-lake-data-testnet" }, - { - name = "MPC_INDEXER_START_BLOCK_HEIGHT" - value = 177673773 - }, { name = "AWS_DEFAULT_REGION" value = "eu-central-1" diff --git a/infra/partner-mainnet/main.tf b/infra/partner-mainnet/main.tf index a11b604e8..6221e1fa2 100644 --- a/infra/partner-mainnet/main.tf +++ b/infra/partner-mainnet/main.tf @@ -94,7 +94,6 @@ resource "google_service_account" "service_account" { resource "google_project_iam_member" "sa-roles" { for_each = toset([ - "roles/datastore.user", "roles/secretmanager.admin", "roles/storage.objectAdmin", "roles/iam.serviceAccountAdmin", diff --git a/infra/partner-mainnet/variables.tf b/infra/partner-mainnet/variables.tf index 363b9bcd4..b38bc4adf 100644 --- a/infra/partner-mainnet/variables.tf +++ b/infra/partner-mainnet/variables.tf @@ -104,10 +104,6 @@ variable "static_env" { name = "MPC_INDEXER_S3_BUCKET" value = "near-lake-data-mainnet" }, - { - name = "MPC_INDEXER_START_BLOCK_HEIGHT" - value = 124647189 - }, { name = "AWS_DEFAULT_REGION" value = "eu-central-1" diff --git a/infra/partner-testnet/main.tf b/infra/partner-testnet/main.tf index 0aef67bae..520bea8f3 100644 --- a/infra/partner-testnet/main.tf +++ b/infra/partner-testnet/main.tf @@ -94,7 +94,6 @@ resource "google_service_account" "service_account" { resource "google_project_iam_member" "sa-roles" { for_each = toset([ - "roles/datastore.user", "roles/secretmanager.admin", "roles/storage.objectAdmin", "roles/iam.serviceAccountAdmin", diff --git a/infra/partner-testnet/variables.tf b/infra/partner-testnet/variables.tf index 05141ea58..10d287370 100644 --- a/infra/partner-testnet/variables.tf +++ b/infra/partner-testnet/variables.tf @@ -101,10 +101,6 @@ variable "static_env" { name = "MPC_INDEXER_S3_BUCKET" value = "near-lake-data-testnet" }, - { - name = "MPC_INDEXER_START_BLOCK_HEIGHT" - value = 158767549 - }, { name = "AWS_DEFAULT_REGION" value = "eu-central-1" diff --git a/integration-tests/chain-signatures/src/containers.rs b/integration-tests/chain-signatures/src/containers.rs index 460d51d23..3c449cc63 100644 --- a/integration-tests/chain-signatures/src/containers.rs +++ b/integration-tests/chain-signatures/src/containers.rs @@ -96,7 +96,6 @@ impl<'a> Node<'a> { s3_bucket: ctx.localstack.s3_bucket.clone(), s3_region: ctx.localstack.s3_region.clone(), s3_url: Some(ctx.localstack.s3_host_address.clone()), - start_block_height: 0, running_threshold: 120, behind_threshold: 120, }; @@ -563,66 +562,6 @@ impl Default for DockerClient { } } -pub struct Datastore<'a> { - pub container: Container<'a, GenericImage>, - pub address: String, - pub local_address: String, -} - -impl<'a> Datastore<'a> { - pub const CONTAINER_PORT: u16 = 3000; - - pub async fn run( - docker_client: &'a DockerClient, - network: &str, - project_id: &str, - ) -> anyhow::Result> { - tracing::info!("Running datastore container..."); - let image = GenericImage::new( - "gcr.io/google.com/cloudsdktool/google-cloud-cli", - "464.0.0-emulators", - ) - .with_wait_for(WaitFor::message_on_stderr("Dev App Server is now running.")) - .with_exposed_port(Self::CONTAINER_PORT) - .with_entrypoint("gcloud") - .with_env_var( - "DATASTORE_EMULATOR_HOST", - format!("0.0.0.0:{}", Self::CONTAINER_PORT), - ) - .with_env_var("DATASTORE_PROJECT_ID", project_id); - let image: RunnableImage = ( - image, - vec![ - "beta".to_string(), - "emulators".to_string(), - "datastore".to_string(), - "start".to_string(), - format!("--project={project_id}"), - "--host-port".to_string(), - format!("0.0.0.0:{}", Self::CONTAINER_PORT), - "--no-store-on-disk".to_string(), - "--consistency=1.0".to_string(), - ], - ) - .into(); - let image = image.with_network(network); - let container = docker_client.cli.run(image); - let ip_address = docker_client - .get_network_ip_address(&container, network) - .await?; - let host_port = container.get_host_port_ipv4(Self::CONTAINER_PORT); - - let full_address = format!("http://{}:{}/", ip_address, Self::CONTAINER_PORT); - let local_address = format!("http://127.0.0.1:{}/", host_port); - tracing::info!("Datastore container is running at {}", full_address); - Ok(Datastore { - container, - local_address, - address: full_address, - }) - } -} - pub struct Redis<'a> { pub container: Container<'a, GenericImage>, pub internal_address: String, diff --git a/integration-tests/chain-signatures/src/lib.rs b/integration-tests/chain-signatures/src/lib.rs index 54f470d4b..08e4fd548 100644 --- a/integration-tests/chain-signatures/src/lib.rs +++ b/integration-tests/chain-signatures/src/lib.rs @@ -19,7 +19,7 @@ use mpc_node::gcp::GcpService; use mpc_node::http_client; use mpc_node::mesh; use mpc_node::storage; -use mpc_node::storage::triple_storage::TripleRedisStorage; +use mpc_node::storage::triple_storage::TripleStorage; use near_crypto::KeyFile; use near_workspaces::network::{Sandbox, ValidatorKey}; use near_workspaces::types::{KeyType, SecretKey}; @@ -156,11 +156,7 @@ impl Nodes<'_> { Ok(()) } - pub async fn triple_storage( - &self, - redis_pool: &Pool, - account_id: &AccountId, - ) -> TripleRedisStorage { + pub async fn triple_storage(&self, redis_pool: &Pool, account_id: &AccountId) -> TripleStorage { storage::triple_storage::init(redis_pool, account_id) } @@ -204,7 +200,6 @@ pub struct Context<'a> { pub lake_indexer: crate::containers::LakeIndexer<'a>, pub worker: Worker, pub mpc_contract: Contract, - pub datastore: crate::containers::Datastore<'a>, pub redis: crate::containers::Redis<'a>, pub storage_options: storage::Options, pub mesh_options: mesh::Options, @@ -231,10 +226,6 @@ pub async fn setup(docker_client: &DockerClient) -> anyhow::Result> .await?; tracing::info!(contract_id = %mpc_contract.id(), "deployed mpc contract"); - let gcp_project_id = "multichain-integration"; - let datastore = - crate::containers::Datastore::run(docker_client, docker_network, gcp_project_id).await?; - let redis = crate::containers::Redis::run(docker_client, docker_network).await?; let redis_url = redis.internal_address.clone(); @@ -243,7 +234,6 @@ pub async fn setup(docker_client: &DockerClient) -> anyhow::Result> env: "local-test".to_string(), gcp_project_id: "multichain-integration".to_string(), sk_share_secret_id: None, - gcp_datastore_url: Some(datastore.local_address.clone()), sk_share_local_path: Some(sk_share_local_path), redis_url, }; @@ -263,7 +253,6 @@ pub async fn setup(docker_client: &DockerClient) -> anyhow::Result> lake_indexer, worker, mpc_contract, - datastore, redis, storage_options, mesh_options, diff --git a/integration-tests/chain-signatures/src/local.rs b/integration-tests/chain-signatures/src/local.rs index 923ccb155..fca763ed6 100644 --- a/integration-tests/chain-signatures/src/local.rs +++ b/integration-tests/chain-signatures/src/local.rs @@ -52,7 +52,6 @@ impl Node { s3_bucket: ctx.localstack.s3_bucket.clone(), s3_region: ctx.localstack.s3_region.clone(), s3_url: Some(ctx.localstack.s3_host_address.clone()), - start_block_height: 0, running_threshold: 120, behind_threshold: 120, }; @@ -147,7 +146,6 @@ impl Node { s3_bucket: ctx.localstack.s3_bucket.clone(), s3_region: ctx.localstack.s3_region.clone(), s3_url: Some(ctx.localstack.s3_host_address.clone()), - start_block_height: 0, running_threshold: 120, behind_threshold: 120, }; diff --git a/integration-tests/chain-signatures/src/main.rs b/integration-tests/chain-signatures/src/main.rs index 37f01f913..e928497f1 100644 --- a/integration-tests/chain-signatures/src/main.rs +++ b/integration-tests/chain-signatures/src/main.rs @@ -60,7 +60,6 @@ async fn main() -> anyhow::Result<()> { println!(" release: {}", ctx.release); println!("\nExternal services:"); - println!(" datastore: {}", ctx.datastore.local_address); println!(" lake_indexer: {}", ctx.lake_indexer.rpc_host_address); println!(" redis: {}", ctx.redis.internal_address); diff --git a/integration-tests/chain-signatures/tests/cases/mod.rs b/integration-tests/chain-signatures/tests/cases/mod.rs index 2f696b0b1..4c352913e 100644 --- a/integration-tests/chain-signatures/tests/cases/mod.rs +++ b/integration-tests/chain-signatures/tests/cases/mod.rs @@ -19,7 +19,6 @@ use mpc_node::kdf::into_eth_sig; use mpc_node::protocol::presignature::{Presignature, PresignatureId, PresignatureManager}; use mpc_node::protocol::triple::{Triple, TripleManager}; use mpc_node::storage; -use mpc_node::types::LatestBlockHeight; use mpc_node::util::NearPublicKeyExt; use near_account_id::AccountId; use test_log::test; @@ -398,37 +397,6 @@ fn dummy_triple(id: u64) -> Triple { } } -#[test(tokio::test)] -async fn test_latest_block_height() -> anyhow::Result<()> { - with_multichain_nodes(MultichainConfig::default(), |ctx| { - Box::pin(async move { - let state_0 = wait_for::running_mpc(&ctx, Some(0)).await?; - assert_eq!(state_0.participants.len(), 3); - wait_for::has_at_least_triples(&ctx, 2).await?; - wait_for::has_at_least_presignatures(&ctx, 2).await?; - - let gcp_services = ctx.nodes.gcp_services().await?; - for gcp_service in &gcp_services { - let latest = LatestBlockHeight::fetch(gcp_service).await?; - assert!(latest.block_height > 10); - } - - // test manually updating the latest block height - let gcp_service = gcp_services[0].clone(); - let latest = LatestBlockHeight { - account_id: gcp_service.account_id.clone(), - block_height: 1000, - }; - latest.store(&gcp_service).await?; - let new_latest = LatestBlockHeight::fetch(&gcp_service).await?; - assert_eq!(new_latest.block_height, latest.block_height); - - Ok(()) - }) - }) - .await -} - #[test(tokio::test)] async fn test_signature_offline_node_back_online() -> anyhow::Result<()> { with_multichain_nodes(MultichainConfig::default(), |mut ctx| {