From ca7fc0df72fae55d3c852eed53d554e7132520d2 Mon Sep 17 00:00:00 2001 From: vividwei Date: Sun, 8 Oct 2023 15:47:49 +0800 Subject: [PATCH 1/6] =?UTF-8?q?syncx:=20=E6=94=AF=E6=8C=81=E5=88=86key?= =?UTF-8?q?=E5=8A=A0=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .CHANGELOG.md | 2 +- syncx/segment_key_lock.go | 67 ++++++++++++++++++++++++++++++++++ syncx/segment_key_lock_test.go | 54 +++++++++++++++++++++++++++ 3 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 syncx/segment_key_lock.go create mode 100644 syncx/segment_key_lock_test.go diff --git a/.CHANGELOG.md b/.CHANGELOG.md index 905ad1b..6d0966d 100644 --- a/.CHANGELOG.md +++ b/.CHANGELOG.md @@ -1,5 +1,5 @@ # 开发中 - +- [syncx: 支持分key加锁](https://github.com/ecodeclub/ekit/pull/224) # v0.0.8 - [atomicx: 泛型封装 atomic.Value](https://github.com/gotomicro/ekit/pull/101) - [queue: API 定义](https://github.com/gotomicro/ekit/pull/109) diff --git a/syncx/segment_key_lock.go b/syncx/segment_key_lock.go new file mode 100644 index 0000000..32b0ccb --- /dev/null +++ b/syncx/segment_key_lock.go @@ -0,0 +1,67 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncx + +import ( + "hash/fnv" + "sync" +) + +// SegmentKeysLock 部分key lock结构定义 +type SegmentKeysLock struct { + locks []sync.RWMutex +} + +// NewSegmentKeysLock 创建 SegmentKeysLock 示例 +func NewSegmentKeysLock(size int) *SegmentKeysLock { + return &SegmentKeysLock{ + locks: make([]sync.RWMutex, size), + } +} + +// hash 索引锁的hash函数 +func (s *SegmentKeysLock) hash(key string) uint32 { + h := fnv.New32a() + h.Write([]byte(key)) + return h.Sum32() +} + +// RLock 读锁加锁 +func (s *SegmentKeysLock) RLock(key string) { + hash := s.hash(key) + lock := &s.locks[hash%uint32(len(s.locks))] + lock.RLock() +} + +// RLock 读锁解锁 +func (s *SegmentKeysLock) RUnlock(key string) { + hash := s.hash(key) + lock := &s.locks[hash%uint32(len(s.locks))] + lock.RUnlock() +} + +// Lock 写锁加锁 +func (s *SegmentKeysLock) Lock(key string) { + hash := s.hash(key) + lock := &s.locks[hash%uint32(len(s.locks))] + lock.Lock() +} + +// Unlock 写锁解锁 +func (s *SegmentKeysLock) Unlock(key string) { + hash := s.hash(key) + lock := &s.locks[hash%uint32(len(s.locks))] + lock.Unlock() +} diff --git a/syncx/segment_key_lock_test.go b/syncx/segment_key_lock_test.go new file mode 100644 index 0000000..00cf927 --- /dev/null +++ b/syncx/segment_key_lock_test.go @@ -0,0 +1,54 @@ +// Copyright 2021 ecodeclub +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncx + +import ( + "sync" + "testing" + "time" +) + +func TestSegmentKeysLock(t *testing.T) { + s := NewSegmentKeysLock(10) + key := "test_key" + + var wg sync.WaitGroup + wg.Add(2) + + // 写 goroutine + go func() { + defer wg.Done() + s.Lock(key) + defer s.Unlock(key) + + // 模拟写操作 + time.Sleep(100 * time.Millisecond) + }() + + // 等待一段时间以确保写 goroutine 先获取锁 + time.Sleep(50 * time.Millisecond) + + // 读 goroutine + go func() { + defer wg.Done() + s.RLock(key) + defer s.RUnlock(key) + + // 如果读写锁工作正常,这个打印语句应该在写 goroutine 完成后才执行 + t.Log("Read operation executed") + }() + + wg.Wait() +} From b461049f83ba33d337d206f8f9f1f91cf35d86f1 Mon Sep 17 00:00:00 2001 From: vividwei Date: Sun, 8 Oct 2023 20:38:28 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=94=A8=E4=BE=8B=E4=B8=BA=E6=9B=B4=E5=8F=AF=E9=9D=A0=E7=9A=84?= =?UTF-8?q?=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- syncx/segment_key_lock_test.go | 42 +++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/syncx/segment_key_lock_test.go b/syncx/segment_key_lock_test.go index 00cf927..4aca354 100644 --- a/syncx/segment_key_lock_test.go +++ b/syncx/segment_key_lock_test.go @@ -17,38 +17,54 @@ package syncx import ( "sync" "testing" - "time" + + "github.com/stretchr/testify/assert" ) func TestSegmentKeysLock(t *testing.T) { s := NewSegmentKeysLock(10) key := "test_key" - var wg sync.WaitGroup wg.Add(2) + writeDone := false + readStarted := false + cond := sync.NewCond(&sync.Mutex{}) + cond.L.Lock() // 写 goroutine go func() { defer wg.Done() - s.Lock(key) - defer s.Unlock(key) - - // 模拟写操作 - time.Sleep(100 * time.Millisecond) + s.Lock(key) // 模拟写操作 + writeDone = true + cond.Broadcast() + s.Unlock(key) }() - // 等待一段时间以确保写 goroutine 先获取锁 - time.Sleep(50 * time.Millisecond) - // 读 goroutine go func() { defer wg.Done() + cond.L.Lock() + defer cond.L.Unlock() + + // 等待写操作完成 + for !writeDone { + cond.Wait() + } + + readStarted = true + cond.Broadcast() s.RLock(key) defer s.RUnlock(key) - - // 如果读写锁工作正常,这个打印语句应该在写 goroutine 完成后才执行 - t.Log("Read operation executed") }() + // 等待读操作开始 + for !readStarted { + cond.Wait() + } + + // 检查写操作是否已完成 + assert.True(t, writeDone, "Write operation did not complete before read operation started") + + cond.L.Unlock() wg.Wait() } From e8afd7aa092a922c1d7433533b0b921e9c9375b3 Mon Sep 17 00:00:00 2001 From: vividwei Date: Sun, 8 Oct 2023 20:48:39 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=94=A8=E4=BE=8B=E7=9A=84data=20race=E9=97=AE=E9=A2=98?= =?UTF-8?q?=EF=BC=8C=E5=B0=86=E7=9B=B4=E6=8E=A5=E8=B5=8B=E5=80=BC=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E5=8E=9F=E5=AD=90=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- syncx/segment_key_lock_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/syncx/segment_key_lock_test.go b/syncx/segment_key_lock_test.go index 4aca354..2aa88cc 100644 --- a/syncx/segment_key_lock_test.go +++ b/syncx/segment_key_lock_test.go @@ -16,6 +16,7 @@ package syncx import ( "sync" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -26,8 +27,8 @@ func TestSegmentKeysLock(t *testing.T) { key := "test_key" var wg sync.WaitGroup wg.Add(2) - writeDone := false - readStarted := false + var writeDone int32 + var readStarted int32 cond := sync.NewCond(&sync.Mutex{}) cond.L.Lock() @@ -35,7 +36,7 @@ func TestSegmentKeysLock(t *testing.T) { go func() { defer wg.Done() s.Lock(key) // 模拟写操作 - writeDone = true + atomic.StoreInt32(&writeDone, 1) cond.Broadcast() s.Unlock(key) }() @@ -47,23 +48,23 @@ func TestSegmentKeysLock(t *testing.T) { defer cond.L.Unlock() // 等待写操作完成 - for !writeDone { + for atomic.LoadInt32(&writeDone) != 1 { cond.Wait() } - readStarted = true + atomic.StoreInt32(&readStarted, 1) cond.Broadcast() s.RLock(key) defer s.RUnlock(key) }() // 等待读操作开始 - for !readStarted { + for atomic.LoadInt32(&readStarted) != 1 { cond.Wait() } // 检查写操作是否已完成 - assert.True(t, writeDone, "Write operation did not complete before read operation started") + assert.Equal(t, int32(1), atomic.LoadInt32(&writeDone), "Write operation did not complete before read operation started") cond.L.Unlock() wg.Wait() From 97f91faaef3df6dc14c2972780b2abf833aab50c Mon Sep 17 00:00:00 2001 From: vividwei Date: Mon, 9 Oct 2023 10:15:08 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E5=B0=86=E9=94=81=E6=94=B9=E4=B8=BA?= =?UTF-8?q?=E6=8C=87=E9=92=88=EF=BC=8C=E5=B0=86=E5=BC=80=E9=94=80=E5=89=8D?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- syncx/segment_key_lock.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/syncx/segment_key_lock.go b/syncx/segment_key_lock.go index 32b0ccb..329dd33 100644 --- a/syncx/segment_key_lock.go +++ b/syncx/segment_key_lock.go @@ -21,13 +21,17 @@ import ( // SegmentKeysLock 部分key lock结构定义 type SegmentKeysLock struct { - locks []sync.RWMutex + locks []*sync.RWMutex } // NewSegmentKeysLock 创建 SegmentKeysLock 示例 func NewSegmentKeysLock(size int) *SegmentKeysLock { + locks := make([]*sync.RWMutex, size) + for i := range locks { + locks[i] = &sync.RWMutex{} + } return &SegmentKeysLock{ - locks: make([]sync.RWMutex, size), + locks: locks, } } @@ -41,27 +45,27 @@ func (s *SegmentKeysLock) hash(key string) uint32 { // RLock 读锁加锁 func (s *SegmentKeysLock) RLock(key string) { hash := s.hash(key) - lock := &s.locks[hash%uint32(len(s.locks))] + lock := s.locks[hash%uint32(len(s.locks))] lock.RLock() } -// RLock 读锁解锁 +// RUnlock 读锁解锁 func (s *SegmentKeysLock) RUnlock(key string) { hash := s.hash(key) - lock := &s.locks[hash%uint32(len(s.locks))] + lock := s.locks[hash%uint32(len(s.locks))] lock.RUnlock() } // Lock 写锁加锁 func (s *SegmentKeysLock) Lock(key string) { hash := s.hash(key) - lock := &s.locks[hash%uint32(len(s.locks))] + lock := s.locks[hash%uint32(len(s.locks))] lock.Lock() } // Unlock 写锁解锁 func (s *SegmentKeysLock) Unlock(key string) { hash := s.hash(key) - lock := &s.locks[hash%uint32(len(s.locks))] + lock := s.locks[hash%uint32(len(s.locks))] lock.Unlock() } From 4595ebac7c8699ef077050b8f72f9acbe88adcef Mon Sep 17 00:00:00 2001 From: vividwei Date: Tue, 10 Oct 2023 11:35:53 +0800 Subject: [PATCH 5/6] =?UTF-8?q?1.=E4=BD=BF=E7=94=A8go1.19=20atomic?= =?UTF-8?q?=E7=89=B9=E6=80=A7=EF=BC=8C=E4=B8=8D=E4=BD=BF=E7=94=A8=E5=87=BD?= =?UTF-8?q?=E6=95=B0=E5=BC=8F=EF=BC=8C=E4=BD=BF=E7=94=A8=E5=AF=B9=E8=B1=A1?= =?UTF-8?q?=E3=80=82=202.=E7=BB=99=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B3=A8=E9=87=8A=E8=AF=B4=E6=98=8E=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- syncx/segment_key_lock_test.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/syncx/segment_key_lock_test.go b/syncx/segment_key_lock_test.go index 2aa88cc..5b261e9 100644 --- a/syncx/segment_key_lock_test.go +++ b/syncx/segment_key_lock_test.go @@ -27,18 +27,20 @@ func TestSegmentKeysLock(t *testing.T) { key := "test_key" var wg sync.WaitGroup wg.Add(2) - var writeDone int32 - var readStarted int32 + var writeDone atomic.Bool + var readStarted atomic.Bool + val := false cond := sync.NewCond(&sync.Mutex{}) cond.L.Lock() // 写 goroutine go func() { defer wg.Done() - s.Lock(key) // 模拟写操作 - atomic.StoreInt32(&writeDone, 1) - cond.Broadcast() + s.Lock(key) + val = true // 加写锁写 s.Unlock(key) + writeDone.Store(true) + cond.Broadcast() }() // 读 goroutine @@ -48,23 +50,24 @@ func TestSegmentKeysLock(t *testing.T) { defer cond.L.Unlock() // 等待写操作完成 - for atomic.LoadInt32(&writeDone) != 1 { + for !writeDone.Load() { cond.Wait() } - atomic.StoreInt32(&readStarted, 1) + readStarted.Store(true) cond.Broadcast() s.RLock(key) + assert.Equal(t, true, val, "Read lock err") // 加读锁读 defer s.RUnlock(key) }() // 等待读操作开始 - for atomic.LoadInt32(&readStarted) != 1 { + for !readStarted.Load() { cond.Wait() } - // 检查写操作是否已完成 - assert.Equal(t, int32(1), atomic.LoadInt32(&writeDone), "Write operation did not complete before read operation started") + // 检查写操作是否已完成,防止意外情况导致读优先写发生 + assert.Equal(t, true, writeDone.Load(), "Write operation did not complete before read operation started") cond.L.Unlock() wg.Wait() From 078e5524cb69c723146c93290c7e8a2806f6c9c4 Mon Sep 17 00:00:00 2001 From: vividwei Date: Tue, 10 Oct 2023 11:48:12 +0800 Subject: [PATCH 6/6] =?UTF-8?q?fmt=20=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- syncx/segment_key_lock_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/syncx/segment_key_lock_test.go b/syncx/segment_key_lock_test.go index 5b261e9..cb7d333 100644 --- a/syncx/segment_key_lock_test.go +++ b/syncx/segment_key_lock_test.go @@ -37,7 +37,7 @@ func TestSegmentKeysLock(t *testing.T) { go func() { defer wg.Done() s.Lock(key) - val = true // 加写锁写 + val = true // 加写锁写 s.Unlock(key) writeDone.Store(true) cond.Broadcast() @@ -57,7 +57,7 @@ func TestSegmentKeysLock(t *testing.T) { readStarted.Store(true) cond.Broadcast() s.RLock(key) - assert.Equal(t, true, val, "Read lock err") // 加读锁读 + assert.Equal(t, true, val, "Read lock err") // 加读锁读 defer s.RUnlock(key) }()