From df9a394fcb9f02606c7dce2b0436ebab179343f6 Mon Sep 17 00:00:00 2001 From: konstin Date: Wed, 17 Jan 2024 13:47:23 +0100 Subject: [PATCH] Box large futures to reduce stack sizes --- crates/puffin-client/src/cached_client.rs | 5 +++-- .../src/distribution_database.rs | 8 +++++-- crates/puffin-distribution/src/source/mod.rs | 22 +++++++++++++++---- crates/puffin-installer/src/downloader.rs | 5 +++-- crates/puffin-resolver/src/resolver/mod.rs | 5 ++++- crates/puffin-traits/src/lib.rs | 2 +- 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/crates/puffin-client/src/cached_client.rs b/crates/puffin-client/src/cached_client.rs index 55f243c36a941..514d5e80b0e51 100644 --- a/crates/puffin-client/src/cached_client.rs +++ b/crates/puffin-client/src/cached_client.rs @@ -1,3 +1,4 @@ +use futures::FutureExt; use std::future::Future; use std::time::SystemTime; @@ -88,7 +89,7 @@ impl CachedClient { /// client. #[instrument(skip_all)] pub async fn get_cached_with_callback< - Payload: Serialize + DeserializeOwned, + Payload: Serialize + DeserializeOwned + Send, CallBackError, Callback, CallbackReturn, @@ -128,7 +129,7 @@ impl CachedClient { None }; - let cached_response = self.send_cached(req, cached).await?; + let cached_response = self.send_cached(req, cached).boxed().await?; let write_cache = info_span!("write_cache", file = %cache_entry.path().display()); match cached_response { diff --git a/crates/puffin-distribution/src/distribution_database.rs b/crates/puffin-distribution/src/distribution_database.rs index 21530c6d5c332..38ce414b216e7 100644 --- a/crates/puffin-distribution/src/distribution_database.rs +++ b/crates/puffin-distribution/src/distribution_database.rs @@ -5,6 +5,7 @@ use std::str::FromStr; use std::sync::Arc; use fs_err::tokio as fs; +use futures::FutureExt; use thiserror::Error; use tokio::task::JoinError; use tokio_util::compat::FuturesAsyncReadCompatExt; @@ -218,7 +219,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> let lock = self.locks.acquire(&dist).await; let _guard = lock.lock().await; - let built_wheel = self.builder.download_and_build(source_dist).await?; + let built_wheel = self.builder.download_and_build(source_dist).boxed().await?; Ok(LocalWheel::Built(BuiltWheel { dist: dist.clone(), path: built_wheel.path, @@ -241,7 +242,9 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> dist: &Dist, ) -> Result<(Metadata21, Option), DistributionDatabaseError> { match dist { - Dist::Built(built_dist) => Ok((self.client.wheel_metadata(built_dist).await?, None)), + Dist::Built(built_dist) => { + Ok((self.client.wheel_metadata(built_dist).boxed().await?, None)) + } Dist::Source(source_dist) => { // Optimization: Skip source dist download when we must not build them anyway. if self.build_context.no_build() { @@ -262,6 +265,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> let metadata = self .builder .download_and_build_metadata(&source_dist) + .boxed() .await?; Ok((metadata, precise)) } diff --git a/crates/puffin-distribution/src/source/mod.rs b/crates/puffin-distribution/src/source/mod.rs index 733c18d654b55..27573a722211c 100644 --- a/crates/puffin-distribution/src/source/mod.rs +++ b/crates/puffin-distribution/src/source/mod.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use anyhow::Result; use fs_err::tokio as fs; -use futures::TryStreamExt; +use futures::{FutureExt, TryStreamExt}; use reqwest::Response; use tempfile::TempDir; use tokio_util::compat::FuturesAsyncReadCompatExt; @@ -95,6 +95,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { &cache_shard, subdirectory.as_deref(), ) + .boxed() .await? } SourceDist::Registry(registry_source_dist) => { @@ -131,6 +132,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { &cache_shard, None, ) + .boxed() .await? } SourceDist::Git(git_source_dist) => self.git(source_dist, git_source_dist).await?, @@ -168,6 +170,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { &cache_shard, subdirectory.as_deref(), ) + .boxed() .await? } SourceDist::Registry(registry_source_dist) => { @@ -184,7 +187,10 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { path: path.clone(), editable: false, }; - return self.path_metadata(source_dist, &path_source_dist).await; + return self + .path_metadata(source_dist, &path_source_dist) + .boxed() + .await; } }; @@ -204,13 +210,18 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { &cache_shard, None, ) + .boxed() .await? } SourceDist::Git(git_source_dist) => { - self.git_metadata(source_dist, git_source_dist).await? + self.git_metadata(source_dist, git_source_dist) + .boxed() + .await? } SourceDist::Path(path_source_dist) => { - self.path_metadata(source_dist, path_source_dist).await? + self.path_metadata(source_dist, path_source_dist) + .boxed() + .await? } }; @@ -375,6 +386,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self .build_source_dist_metadata(source_dist, source_dist_entry.path(), subdirectory) + .boxed() .await? { if let Ok(cached) = fs::read(cache_entry.path()).await { @@ -581,6 +593,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self .build_source_dist_metadata(source_dist, &path_source_dist.path, None) + .boxed() .await? { // Store the metadata for this build along with all the other builds. @@ -729,6 +742,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self .build_source_dist_metadata(source_dist, fetch.path(), subdirectory.as_deref()) + .boxed() .await? { // Store the metadata for this build along with all the other builds. diff --git a/crates/puffin-installer/src/downloader.rs b/crates/puffin-installer/src/downloader.rs index ca716adc898fe..a842cdca60ade 100644 --- a/crates/puffin-installer/src/downloader.rs +++ b/crates/puffin-installer/src/downloader.rs @@ -2,7 +2,7 @@ use std::cmp::Reverse; use std::path::Path; use std::sync::Arc; -use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use tokio::task::JoinError; use tracing::{instrument, warn}; use url::Url; @@ -68,7 +68,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { ) -> impl Stream> + 'stream { futures::stream::iter(distributions) .map(|dist| async { - let wheel = self.get_wheel(dist, in_flight).await?; + let wheel = self.get_wheel(dist, in_flight).boxed().await?; if let Some(reporter) = self.reporter.as_ref() { reporter.on_progress(&wheel); } @@ -158,6 +158,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { let download: LocalWheel = self .database .get_or_build_wheel(dist.clone()) + .boxed() .map_err(|err| Error::Fetch(dist.clone(), err)) .await?; let result = Self::unzip_wheel(download).await; diff --git a/crates/puffin-resolver/src/resolver/mod.rs b/crates/puffin-resolver/src/resolver/mod.rs index cd5dcd193cfbc..b8ec28db08686 100644 --- a/crates/puffin-resolver/src/resolver/mod.rs +++ b/crates/puffin-resolver/src/resolver/mod.rs @@ -683,7 +683,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { /// Fetch the metadata for a stream of packages and versions. async fn fetch(&self, request_stream: UnboundedReceiver) -> Result<(), ResolveError> { let mut response_stream = request_stream - .map(|request| self.process_request(request)) + .map(|request| self.process_request(request).boxed()) .buffer_unordered(50); while let Some(response) = response_stream.next().await { @@ -738,6 +738,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { let version_map = self .provider .get_version_map(&package_name) + .boxed() .await .map_err(ResolveError::Client)?; Ok(Some(Response::Package(package_name, version_map))) @@ -748,6 +749,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { let (metadata, precise) = self .provider .get_or_build_wheel_metadata(&dist) + .boxed() .await .map_err(|err| match dist.clone() { Dist::Built(BuiltDist::Path(built_dist)) => { @@ -800,6 +802,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { let (metadata, precise) = self .provider .get_or_build_wheel_metadata(&dist) + .boxed() .await .map_err(|err| match dist.clone() { Dist::Built(BuiltDist::Path(built_dist)) => { diff --git a/crates/puffin-traits/src/lib.rs b/crates/puffin-traits/src/lib.rs index 0641e33b1871a..fea1d9180d5ef 100644 --- a/crates/puffin-traits/src/lib.rs +++ b/crates/puffin-traits/src/lib.rs @@ -51,7 +51,7 @@ use puffin_interpreter::{Interpreter, Virtualenv}; /// them. // TODO(konstin): Proper error types -pub trait BuildContext { +pub trait BuildContext: Sync { type SourceDistBuilder: SourceBuildTrait + Send + Sync; fn cache(&self) -> &Cache;