Skip to content

Commit

Permalink
Revert "To revert: failed attempt to shrink TreeNodes by removing the…
Browse files Browse the repository at this point in the history
… tx channel"

This reverts commit ef06491.
  • Loading branch information
SUPERCILEX committed Dec 26, 2023
1 parent ef06491 commit 87a51b4
Showing 1 changed file with 15 additions and 52 deletions.
67 changes: 15 additions & 52 deletions fuc_engine/src/ops/remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,8 @@ fn schedule_deletions<'a, I: Into<Cow<'a, Path>>, F: IntoIterator<Item = I>>(
#[cfg(target_os = "linux")]
mod compat {
use std::{
borrow::Cow,
cell::{LazyCell, UnsafeCell},
ffi::CString,
mem::MaybeUninit,
num::NonZeroUsize,
path::Path,
sync::Arc,
thread,
thread::JoinHandle,
borrow::Cow, cell::LazyCell, ffi::CString, mem::MaybeUninit, num::NonZeroUsize, path::Path,
sync::Arc, thread, thread::JoinHandle,
};

use crossbeam_channel::{Receiver, Sender};
Expand All @@ -144,7 +137,7 @@ mod compat {
pub fn remove_impl<'a>() -> impl DirectoryOp<Cow<'a, Path>> {
let scheduling = LazyCell::new(|| {
let (tx, rx) = crossbeam_channel::unbounded();
(tx.clone(), thread::spawn(|| root_worker_thread(tx, rx)))
(tx, thread::spawn(|| root_worker_thread(rx)))
});

Impl { scheduling }
Expand All @@ -161,6 +154,7 @@ mod compat {
.send(Message::Node(TreeNode {
path: path_buf_to_cstring(dir.into_owned())?,
_parent: None,
messages: tasks.clone(),
}))
.map_err(|_| Error::Internal)
}
Expand All @@ -176,34 +170,7 @@ mod compat {
}
}

thread_local! {
static MESSAGES: UnsafeCell<MaybeUninit<Sender<Message>>> =
const { UnsafeCell::new(MaybeUninit::uninit()) };
}

struct TxTlsGuard;

impl Drop for TxTlsGuard {
fn drop(&mut self) {
MESSAGES.with(|tls| unsafe {
(*tls.get()).assume_init_drop();
});
}
}

#[must_use]
fn init_tx_tls(messages: Sender<Message>) -> TxTlsGuard {
MESSAGES.with(|tls| unsafe {
*tls.get() = MaybeUninit::new(messages);
});

TxTlsGuard
}

fn root_worker_thread(
messages: Sender<Message>,
tasks: Receiver<Message>,
) -> Result<(), Error> {
fn root_worker_thread(tasks: Receiver<Message>) -> Result<(), Error> {
let mut available_parallelism = thread::available_parallelism()
.map(NonZeroUsize::get)
.unwrap_or(1)
Expand All @@ -213,26 +180,24 @@ mod compat {
let mut threads = Vec::with_capacity(available_parallelism);

{
let tls_guard = init_tx_tls(messages.clone());
let mut buf = [MaybeUninit::<u8>::uninit(); 8192];
for message in &tasks {
let mut maybe_spawn = || {
if available_parallelism > 0 && !tasks.is_empty() {
available_parallelism -= 1;
threads.push(scope.spawn({
let tasks = tasks.clone();
|| worker_thread(&messages, tasks)
|| worker_thread(tasks)
}));
}
};
maybe_spawn();

match message {
Message::Node(node) => delete_dir(&messages, node, &mut buf, maybe_spawn)?,
Message::Node(node) => delete_dir(node, &mut buf, maybe_spawn)?,
Message::Error(e) => return Err(e),
}
}
drop((tasks, tls_guard));
}

for thread in threads {
Expand All @@ -242,22 +207,20 @@ mod compat {
})
}

fn worker_thread(messages: &Sender<Message>, tasks: Receiver<Message>) -> Result<(), Error> {
fn worker_thread(tasks: Receiver<Message>) -> Result<(), Error> {
unshare(UnshareFlags::FILES).map_io_err(|| "Failed to unshare FD table.".to_string())?;

let _guard = init_tx_tls(messages.clone());
let mut buf = [MaybeUninit::<u8>::uninit(); 8192];
for message in tasks {
match message {
Message::Node(node) => delete_dir(messages, node, &mut buf, || {})?,
Message::Node(node) => delete_dir(node, &mut buf, || {})?,
Message::Error(e) => return Err(e),
}
}
Ok(())
}

fn delete_dir(
messages: &Sender<Message>,
node: TreeNode,
buf: &mut [MaybeUninit<u8>],
mut maybe_spawn: impl FnMut(),
Expand Down Expand Up @@ -287,10 +250,11 @@ mod compat {
};
if file_type == FileType::Directory {
maybe_spawn();
messages
node.messages
.send(Message::Node(TreeNode {
path: concat_cstrs(&node.path, file.file_name()),
_parent: Some(node.clone()),
messages: node.messages.clone(),
}))
.map_err(|_| Error::Internal)?;
} else {
Expand All @@ -314,23 +278,22 @@ mod compat {
path: CString,
// Needed for the recursive drop implementation
_parent: Option<Arc<TreeNode>>,
messages: Sender<Message>,
}

impl Drop for TreeNode {
fn drop(&mut self) {
let Self {
ref path,
_parent: _,
ref messages,
} = *self;

if let Err(e) = unlinkat(CWD, path, AtFlags::REMOVEDIR)
.map_io_err(|| format!("Failed to delete directory: {path:?}"))
{
MESSAGES.with(|tls| {
// Ignore the result because if the receiver closed, then another error must
// have already occurred.
let _ = unsafe { (*tls.get()).assume_init_ref() }.send(Message::Error(e));
});
// If the receiver closed, then another error must have already occurred.
drop(messages.send(Message::Error(e)));
}
}
}
Expand Down

0 comments on commit 87a51b4

Please sign in to comment.