Skip to content

Commit

Permalink
chore: finalize azure file upload (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
romange authored Dec 28, 2024
1 parent 9bd97e6 commit aabaa93
Showing 1 changed file with 49 additions and 3 deletions.
52 changes: 49 additions & 3 deletions util/cloud/azure/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <absl/strings/str_cat.h>
#include <absl/strings/strip.h>

#include <boost/asio/buffer.hpp>
#include <boost/beast/http/buffer_body.hpp>
#include <boost/beast/http/string_body.hpp>
#include <pugixml.hpp>
Expand Down Expand Up @@ -215,7 +216,10 @@ class WriteFile : public detail::AbstractStorageFile {
error_code Upload();

using UploadRequest = detail::DynamicBodyRequestImpl;
unique_ptr<UploadRequest> PrepareRequest();
using UploadBlockListRequest = detail::DynamicBodyRequestImpl;

unique_ptr<UploadRequest> PrepareUploadBlockReq();
unique_ptr<UploadBlockListRequest> PrepareBlockListReq();

unique_ptr<http::ClientPool> pool_; // must be before client_handle_.
string target_;
Expand Down Expand Up @@ -308,14 +312,29 @@ WriteFile::~WriteFile() {
}

error_code WriteFile::Close() {
if (body_mb_.size() > 0) {
RETURN_ERROR(Upload());
}
DCHECK_EQ(body_mb_.size(), 0u);
auto req = PrepareBlockListReq();

error_code res;
RobustSender sender(pool_.get(), opts_.creds_provider);
RobustSender::SenderResult send_res;
RETURN_ERROR(sender.Send(3, req.get(), &send_res));

auto parser_ptr = std::move(send_res.eb_parser);
const auto& resp_msg = parser_ptr->get();
VLOG(1) << "Close response: " << resp_msg;

return {};
}

error_code WriteFile::Upload() {
size_t body_size = body_mb_.size();
CHECK_GT(body_size, 0u);

auto req = PrepareRequest();
auto req = PrepareUploadBlockReq();

error_code res;
RobustSender sender(pool_.get(), opts_.creds_provider);
Expand All @@ -329,7 +348,7 @@ error_code WriteFile::Upload() {
return {};
}

auto WriteFile::PrepareRequest() -> unique_ptr<UploadRequest> {
auto WriteFile::PrepareUploadBlockReq() -> unique_ptr<UploadRequest> {
string url =
absl::StrCat(target_, "?comp=block&blockid=", absl::Dec(block_id_++, absl::kZeroPad4));
unique_ptr<UploadRequest> upload_req(new UploadRequest(url, h2::verb::put));
Expand All @@ -343,6 +362,33 @@ auto WriteFile::PrepareRequest() -> unique_ptr<UploadRequest> {
return upload_req;
}

auto WriteFile::PrepareBlockListReq() -> unique_ptr<UploadBlockListRequest> {
string url = absl::StrCat(target_, "?comp=blocklist");
unique_ptr<UploadBlockListRequest> upload_req(new UploadBlockListRequest(url, h2::verb::put));

boost::beast::multi_buffer mb;

string body = R"(<?xml version="1.0" encoding="utf-8"?><BlockList>)";

for (unsigned i = 1; i < block_id_; ++i) {
absl::StrAppend(&body, "\n<Uncommitted>", absl::Dec(i, absl::kZeroPad4), "</Uncommitted>");
}
absl::StrAppend(&body, "\n</BlockList>\n");

auto buf_list = mb.prepare(body.size());
size_t res = boost::asio::buffer_copy(buf_list, boost::asio::buffer(body));
DCHECK_EQ(res, body.size());
mb.commit(body.size());

upload_req->SetBody(std::move(mb));

upload_req->SetHeader(h2::field::host, opts_.creds_provider->GetEndpoint());
upload_req->Finalize();
opts_.creds_provider->Sign(upload_req.get());

return upload_req;
}

} // namespace

error_code Storage::ListContainers(function<void(const ContainerItem&)> cb) {
Expand Down

0 comments on commit aabaa93

Please sign in to comment.