Skip to content

Commit

Permalink
feat(diskq): 添加关闭机制
Browse files Browse the repository at this point in the history
  • Loading branch information
Ccheers committed Nov 22, 2022
1 parent 526f0f7 commit 8e49455
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 3 deletions.
12 changes: 11 additions & 1 deletion mq_impl/diskq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/Ccheers/kratos-mq/mq"
Expand All @@ -24,6 +25,8 @@ type ConsumerImpl struct {
sf singleflight.Group
consumerChan map[string]chan mq.Message
pool routinepool.Pool

status uint32
}

func NewConsumer(c *config.Config, logger log.Logger) (mq.Consumer, error) {
Expand All @@ -33,6 +36,7 @@ func NewConsumer(c *config.Config, logger log.Logger) (mq.Consumer, error) {
logger: logger,
consumerChan: make(map[string]chan mq.Message),
pool: routinepool.NewPool("[diskq][Consumer]", 4, routinepool.NewConfig()),
status: statusRunning,
}, nil
}

Expand All @@ -52,6 +56,9 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri
ch := make(chan mq.Message, 1)
x.pool.Go(func(ctx context.Context) {
for {
if atomic.LoadUint32(&x.status) == statusClosed {
return
}
select {
case body := <-queue.ReadChan():
msg, err := mq.NewMessageFromByte(body)
Expand All @@ -70,9 +77,12 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri
}

func (x *ConsumerImpl) Close(ctx context.Context) error {
if !atomic.CompareAndSwapUint32(&x.status, statusRunning, statusClosed) {
return nil
}
x.mu.Lock()
defer x.mu.Unlock()
for uniKey, _ := range x.consumerChan {
for uniKey := range x.consumerChan {
close(x.consumerChan[uniKey])
}
err := gDiskQueueManager.Close(ctx)
Expand Down
6 changes: 6 additions & 0 deletions mq_impl/diskq/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package diskq

const (
statusRunning uint32 = 0
statusClosed uint32 = 1
)
4 changes: 3 additions & 1 deletion mq_impl/mqtt/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync/atomic"

"github.com/Ccheers/kratos-mq/mq"
"github.com/eclipse/paho.mqtt.golang"
"github.com/go-kratos/kratos/v2/log"
)

Expand Down Expand Up @@ -51,6 +50,9 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri

ch := make(chan mq.Message, 1)
token := x.client.Subscribe(topic, x.cfg.WillQos, func(client mqtt.Client, message mqtt.Message) {
if atomic.LoadUint32(&x.status) == statusClosed {
return
}
msg, err := mq.NewMessageFromByte(message.Payload())
if err != nil {
x.logger.Errorw("topic", topic, "channel", channel, "payload", string(message.Payload()), "err", err)
Expand Down
10 changes: 9 additions & 1 deletion mq_impl/nsq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"

"github.com/Ccheers/kratos-mq/mq"
"github.com/Ccheers/kratos-mq/mq_impl/nsq/config"
"github.com/go-kratos/kratos/v2/log"
"github.com/nsqio/go-nsq"
)

var _ mq.Consumer = (*ConsumerImpl)(nil)
Expand All @@ -22,6 +22,8 @@ type ConsumerImpl struct {
mu sync.Mutex
consumerMap map[string]*nsq.Consumer
consumerChan map[string]chan mq.Message

status uint32
}

func NewConsumer(c *config.Config, logger log.Logger) (mq.Consumer, error) {
Expand Down Expand Up @@ -53,6 +55,9 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri

ch := make(chan mq.Message, 1)
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
if atomic.LoadUint32(&x.status) == statusClosed {
return nil
}
msg, err := mq.NewMessageFromByte(message.Body)
if err != nil {
return err
Expand All @@ -73,6 +78,9 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri
}

func (x *ConsumerImpl) Close(ctx context.Context) error {
if !atomic.CompareAndSwapUint32(&x.status, statusRunning, statusClosed) {
return nil
}
x.mu.Lock()
defer x.mu.Unlock()
for uniKey, consumer := range x.consumerMap {
Expand Down
6 changes: 6 additions & 0 deletions mq_impl/nsq/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package nsq

const (
statusRunning uint32 = 0
statusClosed uint32 = 1
)

0 comments on commit 8e49455

Please sign in to comment.