Skip to content

Commit

Permalink
feat(diskq): 本地队列 取消 channel 机制
Browse files Browse the repository at this point in the history
  • Loading branch information
Ccheers committed Nov 22, 2022
1 parent 922f04d commit 526f0f7
Showing 1 changed file with 10 additions and 19 deletions.
29 changes: 10 additions & 19 deletions mq_impl/diskq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type ConsumerImpl struct {

mu sync.Mutex
sf singleflight.Group
consumerMap map[string][]chan mq.Message
consumerChan map[string]chan mq.Message
pool routinepool.Pool
}
Expand All @@ -33,7 +32,6 @@ func NewConsumer(c *config.Config, logger log.Logger) (mq.Consumer, error) {
c: c,
logger: logger,
consumerChan: make(map[string]chan mq.Message),
consumerMap: make(map[string][]chan mq.Message),
pool: routinepool.NewPool("[diskq][Consumer]", 4, routinepool.NewConfig()),
}, nil
}
Expand All @@ -42,40 +40,33 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri
x.mu.Lock()
defer x.mu.Unlock()

uniKey := x.generateKey(topic, channel)
if ch, ok := x.consumerChan[uniKey]; ok {
if ch, ok := x.consumerChan[topic]; ok {
return ch, nil
}

_, _, _ = x.sf.Do(topic, func() (interface{}, error) {
if consumer, ok := x.consumerMap[topic]; ok {
return consumer, nil
ch, _, _ := x.sf.Do(topic, func() (interface{}, error) {
if ch, ok := x.consumerChan[topic]; ok {
return ch, nil
}
queue := gDiskQueueManager.NewDiskQueue(topic, x.c.DataPath, x.c.MaxBytesPerFile, x.c.MinMsgSize, x.c.MaxMsgSize, x.c.SyncEvery, time.Duration(x.c.SyncTimeout)*time.Millisecond, x.logger)
ch := make(chan mq.Message, 1)
x.pool.Go(func(ctx context.Context) {
for {
select {
case body := <-queue.ReadChan():
msg, err := mq.NewMessageFromByte(body)
if err != nil {
x.logger.Log(log.LevelError, "module", "NewMessageFromByte", "err", err, "body", string(body))
}
for _, cch := range x.consumerMap[topic] {
select {
case cch <- msg:
default:
}
_ = x.logger.Log(log.LevelError, "module", "NewMessageFromByte", "err", err, "body", string(body))
}
ch <- msg
}
}
})
return queue, nil
return ch, nil
})

ch := make(chan mq.Message, 1024)
x.consumerMap[topic] = append(x.consumerMap[topic], ch)
x.consumerChan[uniKey] = ch
return ch, nil
x.consumerChan[topic] = ch.(chan mq.Message)
return x.consumerChan[topic], nil
}

func (x *ConsumerImpl) Close(ctx context.Context) error {
Expand Down

0 comments on commit 526f0f7

Please sign in to comment.