Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multipart Upload Concurrency Issues #6460

Open
fsdvh opened this issue Sep 26, 2024 · 16 comments
Open

Multipart Upload Concurrency Issues #6460

fsdvh opened this issue Sep 26, 2024 · 16 comments
Labels
question Further information is requested

Comments

@fsdvh
Copy link
Contributor

fsdvh commented Sep 26, 2024

Describe the bug

So recently we started seeing two issues:

Multiple Shutdown

First of all, I want to note that multiple shutdown calls to the same writer are the issue by itself, but I think we can make the situation better with minimum effort.

Here is the code:

             BufWriterState::Write(x) => {
                    let upload = x.take().ok_or_else(|| {
                        std::io::Error::new(
                            ErrorKind::InvalidInput,
                            "Cannot shutdown a writer that has already been shut down",
                        )
                    })?;
                    self.state = BufWriterState::Flush(
                        async move {
                            upload.finish().await?;
                            Ok(())
                        }
                        .boxed(),
                    )
                }

I think we can change it to something more friendly like this:

             BufWriterState::Write(x) => {
                    if let Some(upload) = x.take() {
                        self.state = BufWriterState::Flush(
                            async move { upload.finish().await.map(|_| ()) }.boxed(),
                        )
                    } else {
                        return Poll::Ready(Ok(()));
                    }
                }

This way on a second shutdown call we just immediately return Ok(())

Upload part size issue

Something leftover during the shutdown, complete before the previous upload, in this case, we're getting:

Your proposed upload is smaller than the minimum allowed size

To mitigate this issue we probably should wait for all previous part uploads to complete and then upload the final part which may be smaller than the minimum size of the last one.

Here is the original code I propose to change:

 pub async fn finish(mut self) -> Result<PutResult> {
        if !self.buffer.is_empty() {
            let part = std::mem::take(&mut self.buffer);
            self.put_part(part.into())
        }

        self.wait_for_capacity(0).await?;

        match self.upload.complete().await {
            Err(e) => {
                self.tasks.shutdown().await;
                self.upload.abort().await?;
                Err(e)
            }
            Ok(result) => Ok(result),
        }
    }

by injecting self.wait_for_capacity(0).await?; before actually putting the last chunk we can mitigate this issue.

 pub async fn finish(mut self) -> Result<PutResult> {
        if !self.buffer.is_empty() {
            self.wait_for_capacity(0).await?;  // here

            let part = std::mem::take(&mut self.buffer);
            self.put_part(part.into())
        }

        self.wait_for_capacity(0).await?;

        match self.upload.complete().await {
            Err(e) => {
                self.tasks.shutdown().await;
                self.upload.abort().await?;
                Err(e)
            }
            Ok(result) => Ok(result),
        }
    }

This way we wait for all ongoing uploads before submitting the last part

@fsdvh fsdvh added the bug label Sep 26, 2024
@fsdvh fsdvh changed the title Small Object Store API improvements Object Store API improvements Sep 26, 2024
@fsdvh
Copy link
Contributor Author

fsdvh commented Sep 26, 2024

I think it's more an enhancement

@tustvold tustvold added enhancement Any new improvement worthy of a entry in the changelog and removed bug labels Sep 26, 2024
@tustvold
Copy link
Contributor

tustvold commented Sep 26, 2024

What is the use-case for multiple shutdowns, this feels like a bug in the calling code?

A similar argument could be made for calling shutdown before the parts have been uploaded, this I think implies the caller is not waiting on the returned futures?

I'm actually confused by the second one, could you provide more context on how you encountered this error, including which store implementation you are using. Parts are numbered and so it shouldn't matter if they complete out of order.

@fsdvh
Copy link
Contributor Author

fsdvh commented Sep 26, 2024

Yes, calling multiple shutdowns it's a bug on the caller side for sure. Changes proposed by me should help mitigate this issue a bit by making shutdown calls relaxed

For the second one, we're using an s3 object store and for some reason, it (s3) doesn't take into account the part number during the multipart upload and return error mentioned above. We managed to solve this issue by calling self.wait_for_capacity(0).await? before calling finish()

@tustvold
Copy link
Contributor

Changes proposed by me should help mitigate this issue a bit by making shutdown calls relaxed

Your proposed change is racy, the first close will wait on the upload to complete, with subsequent calls "completing" instantly.

For the second one, we're using an s3 object store and for some reason, it (s3) doesn't take into account the part number during the multipart upload and return error mentioned above. We managed to solve this issue by calling self.wait_for_capacity(0).await? before calling finish()

Perhaps you could write a simple reproducer for this, I'm not saying S3 doesn't do this but I want to be sure we've correctly identified the issue. Your proposed fix will serialize a round-trip which is unfortunate when many stores are not exactly low-latency.

@fsdvh
Copy link
Contributor Author

fsdvh commented Sep 26, 2024

I will try to provide an example of the second issue, but meanwhile, I thought maybe we can change the flush method of BufWriter:

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
        loop {
            return match &mut self.state {
                 BufWriterState::Write(write) => {
                    if let Some(write) = write {
                        write.poll_for_capacity(cx, 0).map_err(|e| e.into())
                    } else {
                        panic!("Already shut down")
                    }
                },
                BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
                BufWriterState::Flush(_) => panic!("Already shut down"),
                BufWriterState::Prepare(f) => {
                    self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
                    continue;
                }
            };
        }
    }

By actually waiting for all downloads to complete we can give a user and ability to use flush() + shutdown(), wdyt?

@tustvold
Copy link
Contributor

What would be the benefit of this? It wouldn't be able to guarantee that there isn't any data still in flight as it can't upload the final data until shutdown is called. It is a valid point that we're taking a somewhat dubious interpretation of the AsyncWrite trait, but its a necessary evil. We could probably add further documentation to discourage the use of AsyncWrite

@fsdvh
Copy link
Contributor Author

fsdvh commented Sep 26, 2024

yes, but flush will ensure that ongoing writes are flushed which is generally aligned with "flush". And after that, you're okay to finalize your upload in case of shutdown

@tustvold
Copy link
Contributor

will ensure that ongoing writes are flushed which is generally aligned with "flush"

Only those it has actually started writing, I think the proposed behaviour is more confusing. We should just make shutdown do the right thing

@fsdvh
Copy link
Contributor Author

fsdvh commented Oct 2, 2024

@tustvold Sorry for the late response, I was trying to find a reproducer for this issue unfortunately without success so far. If you think it will be okay to just update the shutdown, that would be great

@fsdvh
Copy link
Contributor Author

fsdvh commented Oct 2, 2024

So the proposed change would be oneliner here:

 pub async fn finish(mut self) -> Result<PutResult> {
        if !self.buffer.is_empty() {
            self.wait_for_capacity(0).await?;  // here

            let part = std::mem::take(&mut self.buffer);
            self.put_part(part.into())
        }

        self.wait_for_capacity(0).await?;

        match self.upload.complete().await {
            Err(e) => {
                self.tasks.shutdown().await;
                self.upload.abort().await?;
                Err(e)
            }
            Ok(result) => Ok(result),
        }
    }

@tustvold
Copy link
Contributor

tustvold commented Oct 2, 2024

This will serialize a round-trip adding latency to callers of finish, I would therefore be apprehensive about making this change without a reproducer of the issue it is seeking to fix.

@fsdvh
Copy link
Contributor Author

fsdvh commented Oct 2, 2024

@tustvold makes sense

@fsdvh
Copy link
Contributor Author

fsdvh commented Oct 18, 2024

I managed to reproduce the issue on our staging environment and find a problem that was causing it, even so I can't explain it fully.

We have a code to upload data to the s3 store in chunks, we're using a combination of the MultipartUpload + WriteMultipart.

Here is the code for MultipartUpload for S3:

fn put_part(&mut self, data: PutPayload) -> UploadPart {
        let idx = self.part_idx;
        self.part_idx += 1;
        let state = Arc::clone(&self.state);
        Box::pin(async move {
            let part = state
                .client
                .put_part(&state.location, &state.upload_id, idx, data)
                .await?;
            state.parts.put(idx, part);
            Ok(())
        })
    }

In short, we synchronously increment the index for a writer using (&mut ref) which should protect us from races, and then create a future to write a part.

We call this method here in WriteMultipart when we have a chunk of the data to write.

  pub(crate) fn put_part(&mut self, part: PutPayload) {
        self.tasks.spawn(self.upload.put_part(part));
    }

We're spawning a new task to upload part to S3 and also additionally assigning idx for this partition, everything looks correct and safe as we are protected by &mut ref, in my understanding synchronous part of the put_part will be the executed immediately, without waiting for future polling.

During investigation I added a lot of "logging" to our fork of object-store. And found out a very weird thing. From time to time (I would even say often) I observed such things:

(last -1) -> 1729195754539115 before spawn; 1729195754655545 after spawn
last      -> 1729195754539834 before spawn; 1729195754539847 after spawn

In short, even so last write before calling complete method was scheduled in the correct order immediate execution of the spawned task for the uploading part was scheduled in a different order, which is okay in general as we giving it to the runtime scheduler. But weirdly enough same applies to the sync part of the put_part code, meaning that the last partition with a size less than needed for S3 was assigned the last - 1 index causing the whole upload to fail during completion.

I don't know why it's happening like that, I will probably spend some more time trying to coin it out, but the solution for this problem is quite simple:

pub(crate) fn put_part(&mut self, part: PutPayload) {
        let task = self.upload.put_part(part);
        self.tasks.spawn(task);
    }

create a task future outside of the spawn call to guarantee that we assigned a proper index to the part and then submit a task

Please let me know what you think @tustvold 🙇

@tustvold
Copy link
Contributor

tustvold commented Oct 18, 2024

I don't know why it's happening like that, I will probably spend some more time trying to coin it out, but the solution for this problem is quite simple:

The two code samples are equivalent...

During coralogix#54 I added a lot of "logging" to our fork of object-store. And found out a very weird thing. From time to time (I would even say often) I observed such things:

We have a code to upload data to the s3 store in chunks, we're using a combination of the MultipartUpload + WriteMultipart.

Are you sure this originates from WriteMultipart and not something using MultipartUpload directly? The MultipartUpload trait has a number of invariants that must be maintained by callers, which I could see causing these sorts of issues if they were broken.

Edit: In fact re-reading the initial issue description, you stated you were seeing issues with multiple shutdowns. This almost certainly points to misuse of MultipartUpload as being the culprit, as WriteMultipart consumes on shutdown.

@tustvold tustvold added question Further information is requested and removed enhancement Any new improvement worthy of a entry in the changelog labels Oct 18, 2024
@tustvold tustvold changed the title Object Store API improvements Multipart Upload Concurrency Issues Oct 18, 2024
@fsdvh
Copy link
Contributor Author

fsdvh commented Oct 22, 2024

Yes, you are right @tustvold.

Here is the revisited proposal. This should make a last partition sequential to all other partitions, but I don't think it will do a lot of harm performance-wise, while we get order guarantees

LMK what you think

@tustvold
Copy link
Contributor

tustvold commented Oct 22, 2024

This is the same proposal as before can you please articulate what problem you are trying to fix, with a reproducer.

#6460 (comment)

The fact you're unable to produce this, makes it very hard for me to not think this is a concurrency issue in your codebase

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants