Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]curvefs/client: add check in cache #2888

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions curvefs/src/client/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype) {

const char kCurveFsWarmupOpAdd[] = "add";
const char kCurveFsWarmupOpCancel[] = "cancel";
const char kCurveFsWarmupOpCheck[] = "check";
const char kCurveFsWarmupTypeList[] = "list";
const char kCurveFsWarmupTypeSingle[] = "single";

Expand All @@ -89,6 +90,8 @@ WarmupOpType GetWarmupOpType(const std::string& op) {
ret = WarmupOpType::kWarmupOpAdd;
} else if (op == kCurveFsWarmupOpCancel) {
ret = WarmupOpType::kWarmupOpCancel;
} else if (op == kCurveFsWarmupOpCheck) {
ret = WarmupOpType::kWarmupOpCheck;
}
return ret;
}
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ std::ostream &operator<<(std::ostream &os, MetaServerOpType optype);
constexpr size_t kMinWarmupOpArgsNum = 1;
constexpr size_t kWarmupAddArgsNum = 6;
constexpr size_t kWarmupCancelArgsNum = 2;
constexpr size_t kWarmupCheckArgsNum = 6;

enum class WarmupOpType {
kWarmupOpUnknown = 0,
kWarmupOpAdd = 1,
kWarmupOpCancel = 2,
kWarmupOpCheck = 3,
};

WarmupOpType GetWarmupOpType(const std::string& op);
Expand Down
51 changes: 44 additions & 7 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

#include "curvefs/src/client/curve_fuse_op.h"

#include <fmt/format.h>

#include <cstring>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -66,6 +69,7 @@ using ::curvefs::client::common::WarmupStorageType;
using ::curvefs::client::filesystem::AttrOut;
using ::curvefs::client::filesystem::EntryOut;
using ::curvefs::client::filesystem::FileOut;
using ::curvefs::client::filesystem::IsCheckXAttr;
using ::curvefs::client::filesystem::IsListWarmupXAttr;
using ::curvefs::client::filesystem::IsWarmupXAttr;
using ::curvefs::client::filesystem::StrAttr;
Expand Down Expand Up @@ -237,16 +241,18 @@ void UnInitFuseClient() {
int AddWarmupTask(curvefs::client::common::WarmupType type, fuse_ino_t key,
const std::string& path,
curvefs::client::common::WarmupStorageType storageType,
const std::string& mount_point, const std::string& root) {
const std::string& mount_point, const std::string& root,
bool check = false) {
int ret = 0;
bool result = true;
switch (type) {
case curvefs::client::common::WarmupType::kWarmupTypeList:
result = g_ClientInstance->PutWarmFilelistTask(key, storageType, path,
mount_point, root);
break;
result = g_ClientInstance->PutWarmFilelistTask(
key, storageType, path, mount_point, root, check);
break;
case curvefs::client::common::WarmupType::kWarmupTypeSingle:
result = g_ClientInstance->PutWarmFileTask(key, path, storageType);
result =
g_ClientInstance->PutWarmFileTask(key, path, storageType, check);
break;
default:
// not support add warmup type (warmup single file/dir or filelist)
Expand Down Expand Up @@ -291,6 +297,18 @@ void QueryWarmupTask(fuse_ino_t key, std::string *data) {
VLOG(9) << "Warmup [" << key << "]" << *data;
}

void QueryCheckCachedTask(fuse_ino_t key, std::string* data) {
WarmupProgress progress;
bool ret = g_ClientInstance->GetCheckCachedProgress(key, &progress);
if (!ret) {
*data = "no check task or not warmup yet";
} else {
*data =
fmt::format("{}/{}", progress.GetFinished(), progress.GetTotal());
}
VLOG(9) << "check cached [" << key << "]" << *data;
}

void ListWarmupTasks(std::string* data) {
WarmupProgress progress;
std::unordered_map<std::string, WarmupProgress> filepath2progress;
Expand Down Expand Up @@ -344,9 +362,15 @@ int Warmup(fuse_ino_t key, const char* name, const std::string& values) {
}

int ret = 0;
bool check = false;
if (curvefs::client::common::GetWarmupOpType(warmupOpType) ==
curvefs::client::common::WarmupOpType::kWarmupOpCheck) {
check = true;
}

switch (curvefs::client::common::GetWarmupOpType(warmupOpType)) {
case curvefs::client::common::WarmupOpType::kWarmupOpAdd: {
case curvefs::client::common::WarmupOpType::kWarmupOpCheck:
case curvefs::client::common::WarmupOpType::kWarmupOpAdd : {
if (opTypePath.size() !=
curvefs::client::common::kWarmupAddArgsNum) {
LOG(ERROR)
Expand All @@ -373,7 +397,7 @@ int Warmup(fuse_ino_t key, const char* name, const std::string& values) {
ret = AddWarmupTask(
curvefs::client::common::GetWarmupType(warmupDataType), key,
entryFilePathInClient, storageType, mountPointInCurvefs,
rootPathInCurvefs);
rootPathInCurvefs, check);
break;
}
case curvefs::client::common::WarmupOpType::kWarmupOpCancel: {
Expand Down Expand Up @@ -446,6 +470,17 @@ void QueryWarmup(fuse_req_t req, fuse_ino_t ino, size_t size) {
return fs->ReplyBuffer(req, data.data(), data.length());
}

void QueryCheckCached(fuse_req_t req, fuse_ino_t ino, size_t size) {
auto fs = Client()->GetFileSystem();

std::string data;
QueryCheckCachedTask(ino, &data);
if (size == 0) {
return fs->ReplyXattr(req, data.length());
}
return fs->ReplyBuffer(req, data.data(), data.length());
}

void ListWarmup(fuse_req_t req, size_t size) {
auto fs = Client()->GetFileSystem();

Expand Down Expand Up @@ -944,6 +979,8 @@ void FuseOpGetXattr(fuse_req_t req,
return ListWarmup(req, size);
} else if (IsWarmupXAttr(name)) {
return QueryWarmup(req, ino, size);
} else if (IsCheckXAttr(name)) {
return QueryCheckCached(req, ino, size);
}

rc = Client()->FuseOpGetXattr(req, ino, name, &value, size);
Expand Down
5 changes: 5 additions & 0 deletions curvefs/src/client/filesystem/xattr.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const char XATTR_DIR_RFBYTES[] = "curve.dir.rfbytes";
const char XATTR_DIR_PREFIX[] = "curve.dir";
const char XATTR_WARMUP_OP[] = "curvefs.warmup.op";
const char XATTR_WARMUP_OP_LIST[] = "curvefs.warmup.op.list";
const char XATTR_WARMUP_OP_CHECK[] = "curvefs.warmup.check";

inline bool IsSpecialXAttr(const std::string& key) {
static std::map<std::string, bool> xattrs {
Expand All @@ -65,6 +66,10 @@ inline bool IsWarmupXAttr(const std::string& key) {
return key == XATTR_WARMUP_OP;
}

inline bool IsCheckXAttr(const std::string& key) {
return key == XATTR_WARMUP_OP_CHECK;
}

inline bool IsListWarmupXAttr(const std::string& key) {
return key == XATTR_WARMUP_OP_LIST;
}
Expand Down
18 changes: 13 additions & 5 deletions curvefs/src/client/fuse_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,18 +312,18 @@ class FuseClient {
bool PutWarmFilelistTask(fuse_ino_t key, common::WarmupStorageType type,
const std::string& path,
const std::string& mount_point,
const std::string& root) {
const std::string& root, bool check = false) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->AddWarmupFilelist(key, type, path,
mount_point, root);
mount_point, root, check);
} // only support s3
return true;
}

bool PutWarmFileTask(fuse_ino_t key, const std::string &path,
common::WarmupStorageType type) {
bool PutWarmFileTask(fuse_ino_t key, const std::string& path,
common::WarmupStorageType type, bool check = false) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->AddWarmupFile(key, path, type);
return warmupManager_->AddWarmupFile(key, path, type, check);
} // only support s3
return true;
}
Expand All @@ -342,6 +342,14 @@ class FuseClient {
return false;
}

bool GetCheckCachedProgress(fuse_ino_t key,
warmup::WarmupProgress* progress) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->QueryCheckCachedProgress(key, progress);
}
return false;
}

bool GetAllWarmupProgress(Filepath2WarmupProgressMap* filepath2progress) {
if (fsInfo_->fstype() == FSType::TYPE_S3) {
return warmupManager_->ListWarmupProgress(filepath2progress);
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/kvclient/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ cc_library(
"//src/common:curve_common",
"//src/common:curve_s3_adapter",
"@libmemcached",
"@fmt//:fmt",
],
)
3 changes: 3 additions & 0 deletions curvefs/src/client/kvclient/kvclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <libmemcached-1.0/types/return.h>

#include <cstdint>
#include <string>

namespace curvefs {
Expand Down Expand Up @@ -54,6 +55,8 @@ class KVClient {
virtual bool Get(const std::string& key, char* value, uint64_t offset,
uint64_t length, std::string* errorlog,
uint64_t* actLength, memcached_return_t* retCod) = 0;

virtual bool Exist(const std::string& key) = 0;
};

} // namespace client
Expand Down
7 changes: 7 additions & 0 deletions curvefs/src/client/kvclient/kvclient_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,12 @@ int KVClientManager::GetKvCache(
return 0;
}

void KVClientManager::Exist(std::shared_ptr<ExistKVCacheTask> task) {
threadPool_.Enqueue([task, this]() {
task->res = client_->Exist(task->key);
OnReturn(&kvClientManagerMetric_->exist, task);
});
}

} // namespace client
} // namespace curvefs
39 changes: 20 additions & 19 deletions curvefs/src/client/kvclient/kvclient_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ namespace client {
class KVClientManager;
struct SetKVCacheTask;
struct GetKVCacheTask;

class GetKvCacheContext;
class SetKvCacheContext;
struct ExistKVCacheTask;

using curve::common::GetObjectAsyncContext;
using curve::common::TaskThreadPool;
Expand All @@ -58,6 +56,8 @@ using SetKVCacheDone =
std::function<void(const std::shared_ptr<SetKVCacheTask>&)>;
using GetKVCacheDone =
std::function<void(const std::shared_ptr<GetKVCacheTask>&)>;
using ExistKVCacheDone =
std::function<void(const std::shared_ptr<ExistKVCacheTask>&)>;

struct SetKVCacheTask {
std::string key;
Expand All @@ -79,7 +79,7 @@ struct SetKVCacheTask {
};

struct GetKVCacheTask {
const std::string& key;
std::string key;
char* value;
uint64_t offset;
uint64_t valueLength;
Expand All @@ -101,11 +101,21 @@ struct GetKVCacheTask {
timer(butil::Timer::STARTED) {}
};

using GetKvCacheCallBack =
std::function<void(const std::shared_ptr<GetKvCacheContext>&)>;
struct ExistKVCacheTask {
std::string key;
bool res;
uint64_t length = 0; // useless,just for OnReturn
ExistKVCacheDone done;
butil::Timer timer;

using SetKvCacheCallBack =
std::function<void(const std::shared_ptr<SetKvCacheContext>&)>;
explicit ExistKVCacheTask(
const std::string& k,
ExistKVCacheDone done = [](const std::shared_ptr<ExistKVCacheTask>&) {})
: key(k),
res(false),
done(std::move(done)),
timer(butil::Timer::STARTED) {}
};

struct KvCacheContext {
std::string key;
Expand All @@ -117,17 +127,6 @@ struct KvCacheContext {
uint64_t startTime;
};

struct GetKvCacheContext : KvCacheContext {
char* value;
bool res;
GetKvCacheCallBack cb;
};

struct SetKvCacheContext : KvCacheContext {
const char* value;
SetKvCacheCallBack cb;
};

class KVClientManager {
public:
KVClientManager() = default;
Expand All @@ -146,6 +145,8 @@ class KVClientManager {

void Get(std::shared_ptr<GetKVCacheTask> task);

void Exist(std::shared_ptr<ExistKVCacheTask> task);

KVClientManagerMetric* GetMetricForTesting() {
return kvClientManagerMetric_.get();
}
Expand Down
34 changes: 34 additions & 0 deletions curvefs/src/client/kvclient/memcache_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,44 @@

#include "curvefs/src/client/kvclient/memcache_client.h"

#include <libmemcached-1.0/exist.h>
#include <libmemcached-1.0/types/return.h>

#include "src/client/client_metric.h"

namespace curvefs {
namespace client {

thread_local memcached_st* tcli = nullptr;

bool MemCachedClient::Exist(const std::string& key) {
// https://awesomized.github.io/libmemcached/libmemcached/memcached_exist.html?highlight=exist#_CPPv415memcached_existP12memcached_stPcP6size_t
memcached_return_t ue;
size_t value_length = 0;
uint64_t start = butil::cpuwide_time_us();
if (nullptr == tcli) {
LOG(ERROR) << "create tcli";
tcli = memcached_clone(nullptr, client_);
}
ue = memcached_exist(tcli, key.c_str(), key.length());
if (ue == MEMCACHED_SUCCESS) {
curve::client::CollectMetrics(&metric_->exist, 0,
butil::cpuwide_time_us() - start);
return true;
}

if (ue == MEMCACHED_NOTFOUND) {
curve::client::CollectMetrics(&metric_->exist, 0,
butil::cpuwide_time_us() - start);
} else {
std::string errorlog = ResError(ue);
LOG(ERROR) << "Exist key = " << key << " error = " << errorlog;
metric_->exist.eps.count << 1;
}
memcached_free(tcli);
tcli = nullptr;
return false;
}

} // namespace client
} // namespace curvefs
Loading