Skip to content

Commit

Permalink
[move] Stateful (de)serialization for function value lookups & delaye…
Browse files Browse the repository at this point in the history
…d fields improvements (#15594)
  • Loading branch information
georgemitenkov authored Dec 20, 2024
1 parent 8aa15a6 commit 2bea962
Show file tree
Hide file tree
Showing 43 changed files with 1,155 additions and 895 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@ use move_binary_format::{
CompiledModule,
};
use move_core_types::{
account_address::AccountAddress, identifier::IdentStr, language_storage::ModuleId,
account_address::AccountAddress,
identifier::IdentStr,
language_storage::{ModuleId, TypeTag},
metadata::Metadata,
};
use move_vm_runtime::{
ambassador_impl_CodeStorage, ambassador_impl_ModuleStorage,
ambassador_impl_WithRuntimeEnvironment, AsUnsyncCodeStorage, BorrowedOrOwned, CodeStorage,
Module, ModuleStorage, RuntimeEnvironment, Script, UnsyncCodeStorage, UnsyncModuleStorage,
WithRuntimeEnvironment,
Function, Module, ModuleStorage, RuntimeEnvironment, Script, UnsyncCodeStorage,
UnsyncModuleStorage, WithRuntimeEnvironment,
};
use move_vm_types::{
code::{ModuleBytesStorage, ModuleCode},
loaded_data::runtime_types::{StructType, Type},
module_storage_error,
};
use std::{ops::Deref, sync::Arc};
Expand Down
18 changes: 6 additions & 12 deletions aptos-move/aptos-vm-types/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,33 +203,27 @@ pub trait StateStorageView {
/// TODO: audit and reconsider the default implementation (e.g. should not
/// resolve AggregatorV2 via the state-view based default implementation, as it
/// doesn't provide a value exchange functionality).
pub trait TExecutorView<K, T, L, I, V>:
pub trait TExecutorView<K, T, L, V>:
TResourceView<Key = K, Layout = L>
+ TModuleView<Key = K>
+ TAggregatorV1View<Identifier = K>
+ TDelayedFieldView<Identifier = I, ResourceKey = K, ResourceGroupTag = T>
+ TDelayedFieldView<Identifier = DelayedFieldID, ResourceKey = K, ResourceGroupTag = T>
+ StateStorageView<Key = K>
{
}

impl<A, K, T, L, I, V> TExecutorView<K, T, L, I, V> for A where
impl<A, K, T, L, V> TExecutorView<K, T, L, V> for A where
A: TResourceView<Key = K, Layout = L>
+ TModuleView<Key = K>
+ TAggregatorV1View<Identifier = K>
+ TDelayedFieldView<Identifier = I, ResourceKey = K, ResourceGroupTag = T>
+ TDelayedFieldView<Identifier = DelayedFieldID, ResourceKey = K, ResourceGroupTag = T>
+ StateStorageView<Key = K>
{
}

pub trait ExecutorView:
TExecutorView<StateKey, StructTag, MoveTypeLayout, DelayedFieldID, WriteOp>
{
}
pub trait ExecutorView: TExecutorView<StateKey, StructTag, MoveTypeLayout, WriteOp> {}

impl<T> ExecutorView for T where
T: TExecutorView<StateKey, StructTag, MoveTypeLayout, DelayedFieldID, WriteOp>
{
}
impl<T> ExecutorView for T where T: TExecutorView<StateKey, StructTag, MoveTypeLayout, WriteOp> {}

pub trait ResourceGroupView:
TResourceGroupView<GroupKey = StateKey, ResourceTag = StructTag, Layout = MoveTypeLayout>
Expand Down
2 changes: 1 addition & 1 deletion aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput {
.materialized_size()
}

fn get_write_summary(&self) -> HashSet<InputOutputKey<StateKey, StructTag, DelayedFieldID>> {
fn get_write_summary(&self) -> HashSet<InputOutputKey<StateKey, StructTag>> {
let vm_output = self.vm_output.lock();
let output = vm_output
.as_ref()
Expand Down
19 changes: 12 additions & 7 deletions aptos-move/aptos-vm/src/move_vm_ext/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ use move_core_types::{
vm_status::StatusCode,
};
use move_vm_runtime::{
move_vm::MoveVM, native_extensions::NativeContextExtensions, session::Session, ModuleStorage,
VerifiedModuleBundle,
move_vm::MoveVM, native_extensions::NativeContextExtensions, session::Session,
AsFunctionValueExtension, ModuleStorage, VerifiedModuleBundle,
};
use move_vm_types::{value_serde::serialize_and_allow_delayed_values, values::Value};
use move_vm_types::{value_serde::ValueSerDeContext, values::Value};
use std::{
collections::BTreeMap,
ops::{Deref, DerefMut},
Expand Down Expand Up @@ -127,6 +127,7 @@ impl<'r, 'l> SessionExt<'r, 'l> {
module_storage: &impl ModuleStorage,
) -> VMResult<(VMChangeSet, ModuleWriteSet)> {
let move_vm = self.inner.get_move_vm();
let function_extension = module_storage.as_function_value_extension();

let resource_converter = |value: Value,
layout: MoveTypeLayout,
Expand All @@ -136,13 +137,17 @@ impl<'r, 'l> SessionExt<'r, 'l> {
// We allow serialization of native values here because we want to
// temporarily store native values (via encoding to ensure deterministic
// gas charging) in block storage.
serialize_and_allow_delayed_values(&value, &layout)?
ValueSerDeContext::new()
.with_delayed_fields_serde()
.with_func_args_deserialization(&function_extension)
.serialize(&value, &layout)?
.map(|bytes| (bytes.into(), Some(Arc::new(layout))))
} else {
// Otherwise, there should be no native values so ensure
// serialization fails here if there are any.
value
.simple_serialize(&layout)
ValueSerDeContext::new()
.with_func_args_deserialization(&function_extension)
.serialize(&value, &layout)?
.map(|bytes| (bytes.into(), None))
};
serialization_result.ok_or_else(|| {
Expand All @@ -165,7 +170,7 @@ impl<'r, 'l> SessionExt<'r, 'l> {

let table_context: NativeTableContext = extensions.remove();
let table_change_set = table_context
.into_change_set()
.into_change_set(&function_extension)
.map_err(|e| e.finish(Location::Undefined))?;

let aggregator_context: NativeAggregatorContext = extensions.remove();
Expand Down
43 changes: 21 additions & 22 deletions aptos-move/block-executor/src/captured_reads.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
code_cache_global::GlobalModuleCache, types::InputOutputKey,
value_exchange::filter_value_for_exchange,
};
use crate::{code_cache_global::GlobalModuleCache, types::InputOutputKey, view::LatestView};
use anyhow::bail;
use aptos_aggregator::{
delta_math::DeltaHistory,
Expand All @@ -21,15 +18,18 @@ use aptos_mvhashmap::{
};
use aptos_types::{
error::{code_invariant_error, PanicError, PanicOr},
executable::ModulePath,
state_store::state_value::StateValueMetadata,
executable::{Executable, ModulePath},
state_store::{state_value::StateValueMetadata, TStateView},
transaction::BlockExecutableTransaction as Transaction,
write_set::TransactionWrite,
};
use aptos_vm_types::resolver::ResourceGroupSize;
use derivative::Derivative;
use move_core_types::value::MoveTypeLayout;
use move_vm_types::code::{ModuleCode, SyncModuleCache, WithAddress, WithName, WithSize};
use move_vm_types::{
code::{ModuleCode, SyncModuleCache, WithAddress, WithName, WithSize},
delayed_values::delayed_field_id::DelayedFieldID,
};
use std::{
collections::{
hash_map::{
Expand Down Expand Up @@ -325,7 +325,7 @@ pub enum CacheRead<T> {
pub(crate) struct CapturedReads<T: Transaction, K, DC, VC, S> {
data_reads: HashMap<T::Key, DataRead<T::Value>>,
group_reads: HashMap<T::Key, GroupRead<T>>,
delayed_field_reads: HashMap<T::Identifier, DelayedFieldRead>,
delayed_field_reads: HashMap<DelayedFieldID, DelayedFieldRead>,

#[deprecated]
pub(crate) deprecated_module_reads: Vec<T::Key>,
Expand Down Expand Up @@ -359,9 +359,13 @@ where
S: WithSize,
{
// Return an iterator over the captured reads.
pub(crate) fn get_read_values_with_delayed_fields(
pub(crate) fn get_read_values_with_delayed_fields<
SV: TStateView<Key = T::Key>,
X: Executable,
>(
&self,
delayed_write_set_ids: &HashSet<T::Identifier>,
view: &LatestView<T, SV, X>,
delayed_write_set_ids: &HashSet<DelayedFieldID>,
skip: &HashSet<T::Key>,
) -> Result<BTreeMap<T::Key, (StateValueMetadata, u64, Arc<MoveTypeLayout>)>, PanicError> {
self.data_reads
Expand All @@ -372,7 +376,7 @@ where
}

if let DataRead::Versioned(_version, value, Some(layout)) = data_read {
filter_value_for_exchange::<T>(value, layout, delayed_write_set_ids, key)
view.filter_value_for_exchange(value, layout, delayed_write_set_ids, key)
} else {
None
}
Expand Down Expand Up @@ -511,7 +515,7 @@ where

pub(crate) fn capture_delayed_field_read(
&mut self,
id: T::Identifier,
id: DelayedFieldID,
update: bool,
read: DelayedFieldRead,
) -> Result<(), PanicOr<DelayedFieldsSpeculativeError>> {
Expand Down Expand Up @@ -571,7 +575,7 @@ where

pub(crate) fn get_delayed_field_by_kind(
&self,
id: &T::Identifier,
id: &DelayedFieldID,
min_kind: DelayedFieldReadKind,
) -> Option<DelayedFieldRead> {
self.delayed_field_reads
Expand Down Expand Up @@ -718,7 +722,7 @@ where
// (as it internally uses read_latest_predicted_value to get the current value).
pub(crate) fn validate_delayed_field_reads(
&self,
delayed_fields: &dyn TVersionedDelayedFieldView<T::Identifier>,
delayed_fields: &dyn TVersionedDelayedFieldView<DelayedFieldID>,
idx_to_validate: TxnIndex,
) -> Result<bool, PanicError> {
if self.delayed_field_speculative_failure {
Expand Down Expand Up @@ -779,9 +783,7 @@ where
K: Hash + Eq + Ord + Clone + WithAddress + WithName,
VC: Deref<Target = Arc<DC>>,
{
pub(crate) fn get_read_summary(
&self,
) -> HashSet<InputOutputKey<T::Key, T::Tag, T::Identifier>> {
pub(crate) fn get_read_summary(&self) -> HashSet<InputOutputKey<T::Key, T::Tag>> {
let mut ret = HashSet::new();
for (key, read) in &self.data_reads {
if let DataRead::Versioned(_, _, _) = read {
Expand Down Expand Up @@ -822,7 +824,7 @@ where
pub(crate) struct UnsyncReadSet<T: Transaction, K> {
pub(crate) resource_reads: HashSet<T::Key>,
pub(crate) group_reads: HashMap<T::Key, HashSet<T::Tag>>,
pub(crate) delayed_field_reads: HashSet<T::Identifier>,
pub(crate) delayed_field_reads: HashSet<DelayedFieldID>,

#[deprecated]
pub(crate) deprecated_module_reads: HashSet<T::Key>,
Expand All @@ -839,9 +841,7 @@ where
self.module_reads.insert(key);
}

pub(crate) fn get_read_summary(
&self,
) -> HashSet<InputOutputKey<T::Key, T::Tag, T::Identifier>> {
pub(crate) fn get_read_summary(&self) -> HashSet<InputOutputKey<T::Key, T::Tag>> {
let mut ret = HashSet::new();
for key in &self.resource_reads {
ret.insert(InputOutputKey::Resource(key.clone()));
Expand Down Expand Up @@ -1067,7 +1067,6 @@ mod test {

impl Transaction for TestTransactionType {
type Event = MockEvent;
type Identifier = DelayedFieldID;
type Key = KeyType<u32>;
type Tag = u32;
type Value = ValueType;
Expand Down
24 changes: 12 additions & 12 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use fail::fail_point;
use move_binary_format::CompiledModule;
use move_core_types::{language_storage::ModuleId, value::MoveTypeLayout, vm_status::StatusCode};
use move_vm_runtime::{Module, RuntimeEnvironment, WithRuntimeEnvironment};
use move_vm_types::code::ModuleCache;
use move_vm_types::{code::ModuleCache, delayed_values::delayed_field_id::DelayedFieldID};
use num_cpus;
use rayon::ThreadPool;
use std::{
Expand Down Expand Up @@ -115,7 +115,7 @@ where
incarnation: Incarnation,
signature_verified_block: &TP,
last_input_output: &TxnLastInputOutput<T, E::Output, E::Error>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, T::Identifier>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, DelayedFieldID>,
executor: &E,
base_view: &S,
global_module_cache: &GlobalModuleCache<
Expand Down Expand Up @@ -406,7 +406,7 @@ where
Module,
AptosModuleExtension,
>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, T::Identifier>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, DelayedFieldID>,
scheduler: &Scheduler,
) -> bool {
let _timer = TASK_VALIDATE_SECONDS.start_timer();
Expand Down Expand Up @@ -436,7 +436,7 @@ where
fn update_transaction_on_abort(
txn_idx: TxnIndex,
last_input_output: &TxnLastInputOutput<T, E::Output, E::Error>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, T::Identifier>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, DelayedFieldID>,
runtime_environment: &RuntimeEnvironment,
) {
counters::SPECULATIVE_ABORT_COUNT.inc();
Expand Down Expand Up @@ -490,7 +490,7 @@ where
valid: bool,
validation_wave: Wave,
last_input_output: &TxnLastInputOutput<T, E::Output, E::Error>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, T::Identifier>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, DelayedFieldID>,
scheduler: &Scheduler,
runtime_environment: &RuntimeEnvironment,
) -> Result<SchedulerTask, PanicError> {
Expand Down Expand Up @@ -520,7 +520,7 @@ where
/// returns false (indicating that transaction needs to be re-executed).
fn validate_and_commit_delayed_fields(
txn_idx: TxnIndex,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, T::Identifier>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, DelayedFieldID>,
last_input_output: &TxnLastInputOutput<T, E::Output, E::Error>,
) -> Result<bool, PanicError> {
let read_set = last_input_output
Expand Down Expand Up @@ -563,7 +563,7 @@ where
&self,
block_gas_limit_type: &BlockGasLimitType,
scheduler: &Scheduler,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, T::Identifier>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, DelayedFieldID>,
scheduler_task: &mut SchedulerTask,
last_input_output: &TxnLastInputOutput<T, E::Output, E::Error>,
shared_commit_state: &ExplicitSyncWrapper<BlockGasLimitProcessor<T>>,
Expand Down Expand Up @@ -766,7 +766,7 @@ where
Module,
AptosModuleExtension,
>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, T::Identifier>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, DelayedFieldID>,
scheduler: &Scheduler,
runtime_environment: &RuntimeEnvironment,
) -> Result<(), PanicError> {
Expand All @@ -788,7 +788,7 @@ where
fn materialize_aggregator_v1_delta_writes(
txn_idx: TxnIndex,
last_input_output: &TxnLastInputOutput<T, E::Output, E::Error>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, T::Identifier>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, DelayedFieldID>,
base_view: &S,
) -> Vec<(T::Key, WriteOp)> {
// Materialize all the aggregator v1 deltas.
Expand Down Expand Up @@ -840,7 +840,7 @@ where
fn materialize_txn_commit(
&self,
txn_idx: TxnIndex,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, T::Identifier>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, DelayedFieldID>,
scheduler: &Scheduler,
start_shared_counter: u32,
shared_counter: &AtomicU32,
Expand Down Expand Up @@ -953,7 +953,7 @@ where
environment: &AptosEnvironment,
block: &TP,
last_input_output: &TxnLastInputOutput<T, E::Output, E::Error>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, T::Identifier>,
versioned_cache: &MVHashMap<T::Key, T::Tag, T::Value, X, DelayedFieldID>,
scheduler: &Scheduler,
// TODO: should not need to pass base view.
base_view: &S,
Expand Down Expand Up @@ -1274,7 +1274,7 @@ where
Module,
AptosModuleExtension,
>,
unsync_map: &UnsyncMap<T::Key, T::Tag, T::Value, T::Identifier>,
unsync_map: &UnsyncMap<T::Key, T::Tag, T::Value, DelayedFieldID>,
output: &E::Output,
resource_write_set: Vec<(T::Key, Arc<T::Value>, Option<Arc<MoveTypeLayout>>)>,
) -> Result<(), SequentialBlockExecutionError<E::Error>> {
Expand Down
7 changes: 2 additions & 5 deletions aptos-move/block-executor/src/limit_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ mod test {
proptest_types::types::{KeyType, MockEvent, MockTransaction},
types::InputOutputKey,
};
use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID;
use std::collections::HashSet;

// TODO: add tests for accumulate_fee_statement / compute_conflict_multiplier for different BlockGasLimitType configs
Expand Down Expand Up @@ -363,15 +362,13 @@ mod test {
assert!(processor.should_end_block_parallel());
}

fn to_map(
reads: &[InputOutputKey<u64, u32, u64>],
) -> HashSet<InputOutputKey<KeyType<u64>, u32, DelayedFieldID>> {
fn to_map(reads: &[InputOutputKey<u64, u32>]) -> HashSet<InputOutputKey<KeyType<u64>, u32>> {
reads
.iter()
.map(|key| match key {
InputOutputKey::Resource(k) => InputOutputKey::Resource(KeyType(*k, false)),
InputOutputKey::Group(k, t) => InputOutputKey::Group(KeyType(*k, false), *t),
InputOutputKey::DelayedField(i) => InputOutputKey::DelayedField((*i).into()),
InputOutputKey::DelayedField(i) => InputOutputKey::DelayedField(*i),
})
.collect()
}
Expand Down
Loading

0 comments on commit 2bea962

Please sign in to comment.