diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index af8ae87c32fc..cc59a2c95e8f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -64,7 +64,11 @@ jobs: update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 40 \ --slave /usr/bin/g++ g++ /usr/bin/g++-9 run: | + # Work around https://github.com/actions/checkout/issues/766 + git config --global --add safe.directory /src cd /src + git describe --always --tags ${{ github.sha }} + if [ -d build-opt ]; then chown -R root build-opt ls -l ./build-opt @@ -96,6 +100,12 @@ jobs: submodules: true - name: Build artifacts run: | + # Work around https://github.com/actions/checkout/issues/766 + git config --global --add safe.directory "$GITHUB_WORKSPACE" + + git describe --always --tags ${{ github.sha }} + VERSION=$(git describe --always --tags ${{ github.sha }}) + echo "::set-output name=version::${VERSION}" ./tools/release.sh - name: Upload uses: actions/upload-artifact@v3 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0dc082453798..7210740926ee 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -31,7 +31,9 @@ if(ENABLE_GIT_VERSION) else() set(GIT_CLEAN_DIRTY "") endif() + Message(STATUS "GIT_SHA1 ${GIT_SHA1}") git_describe(GIT_VER --always) + Message(STATUS "GIT_VER ${GIT_VER}") string(TIMESTAMP PRJ_BUILD_TIME "%Y-%m-%d %H:%M:%S" UTC) else(ENABLE_GIT_VERSION) set(GIT_VER "dev") diff --git a/src/server/channel_slice.cc b/src/server/channel_slice.cc index d7e445a73698..cd5d1cd3aa03 100644 --- a/src/server/channel_slice.cc +++ b/src/server/channel_slice.cc @@ -8,6 +8,8 @@ extern "C" { #include "redis/util.h" } +#include "base/logging.h" + namespace dfly { using namespace std; @@ -72,7 +74,10 @@ auto ChannelSlice::FetchSubscribers(string_view channel) -> vector { void ChannelSlice::CopySubscribers(const SubscribeMap& src, const std::string& pattern, vector* dest) { for (const auto& sub : src) { - Subscriber s(sub.first, sub.second.thread_id); + ConnectionContext* cntx = sub.first; + CHECK(cntx->conn_state.subscribe_info); + + Subscriber s(cntx, sub.second.thread_id); s.pattern = pattern; s.borrow_token.Inc(); diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 222567d1d528..fa05d223207b 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -46,11 +46,6 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis } } - if (!to_add && conn_state.subscribe_info->IsEmpty()) { - conn_state.subscribe_info.reset(); - force_dispatch = false; - } - sort(channels.begin(), channels.end()); // prepare the array in order to distribute the updates to the shards. @@ -90,6 +85,12 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis // Update subscription shard_set->RunBriefInParallel(move(cb), [&](ShardId sid) { return shard_idx[sid + 1] > shard_idx[sid]; }); + + // It's important to reset + if (!to_add && conn_state.subscribe_info->IsEmpty()) { + conn_state.subscribe_info.reset(); + force_dispatch = false; + } } if (to_reply) { @@ -139,15 +140,10 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args) } } - if (!to_add && conn_state.subscribe_info->IsEmpty()) { - conn_state.subscribe_info.reset(); - force_dispatch = false; - } - int32_t tid = util::ProactorBase::GetIndex(); DCHECK_GE(tid, 0); - // Update the subscribers on publisher's side. + // Update the subscribers on channel-slice side. auto cb = [&](EngineShard* shard) { ChannelSlice& cs = shard->channel_slice(); for (string_view pattern : patterns) { @@ -161,6 +157,13 @@ void ConnectionContext::ChangePSub(bool to_add, bool to_reply, CmdArgList args) // Update pattern subscription. Run on all shards. shard_set->RunBriefInParallel(move(cb)); + + // Important to reset conn_state.subscribe_info only after all references to it were + // removed from channel slices. + if (!to_add && conn_state.subscribe_info->IsEmpty()) { + conn_state.subscribe_info.reset(); + force_dispatch = false; + } } if (to_reply) { @@ -209,7 +212,8 @@ void ConnectionContext::SendSubscriptionChangedResponse(string_view action, } void ConnectionContext::OnClose() { - if (!conn_state.subscribe_info) return; + if (!conn_state.subscribe_info) + return; if (!conn_state.subscribe_info->channels.empty()) { auto token = conn_state.subscribe_info->borrow_token; diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 4595b12b43bf..cf86fca37ef1 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -301,7 +301,7 @@ TEST_F(DflyEngineTest, Hello) { auto resp = Run({"hello", "2"}); ASSERT_THAT(resp, ArrLen(12)); EXPECT_THAT(resp.GetVec(), - ElementsAre("server", "redis", "version", "df-dev", "proto", + ElementsAre("server", "redis", "version", ArgType(RespExpr::STRING), "proto", IntArg(2), "id", ArgType(RespExpr::INT64), "mode", "standalone", "role", "master")); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index dcddc2e43b25..0eadfcc76d90 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -897,6 +897,9 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) { auto cb = [&] { return EngineShard::tlocal()->channel_slice().FetchSubscribers(channel); }; + // How do we know that subsribers did not disappear after we fetched them? + // Each subscriber object hold a borrow_token. + // ConnectionContext::OnClose does not reset subscribe_info before all tokens are returned. vector subscriber_arr = shard_set->Await(sid, std::move(cb)); atomic_uint32_t published{0}; @@ -912,6 +915,8 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) { } fibers_ext::BlockingCounter bc(subscriber_arr.size()); + + // We run publish_cb in each subsriber's thread. auto publish_cb = [&, bc](unsigned idx, util::ProactorBase*) mutable { unsigned start = slices[idx]; @@ -921,6 +926,7 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) { break; published.fetch_add(1, memory_order_relaxed); + facade::Connection* conn = subscriber_arr[i].conn_cntx->owner(); DCHECK(conn); facade::Connection::PubMessage pmsg;