Skip to content

Commit

Permalink
feat(nsq): nsq impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Ccheers committed Jul 12, 2022
1 parent 0349324 commit 5f2e6a7
Show file tree
Hide file tree
Showing 28 changed files with 691 additions and 172 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ require (
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd // indirect
google.golang.org/grpc v1.46.2 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -241,6 +242,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand All @@ -263,6 +265,7 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd h1:e0TwkXOdbnH/1x5rc5MZ/VYyiZ4v+RdVfrGMqEwT68I=
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand All @@ -271,6 +274,7 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.46.2 h1:u+MLGgVf7vRdjEYZ8wDFhAVNmhkbJ5hmrA1LMWK1CAQ=
google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
13 changes: 0 additions & 13 deletions internal/nsq/config/config.proto

This file was deleted.

3 changes: 1 addition & 2 deletions mq/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package mq
import (
"context"

messagev1 "github.com/Ccheers/kratos-mq/mq/message/v1"
"github.com/go-kratos/kratos/v2/encoding"
ejson "github.com/go-kratos/kratos/v2/encoding/json"
eproto "github.com/go-kratos/kratos/v2/encoding/proto"
Expand Down Expand Up @@ -36,7 +35,7 @@ func DefaultEncodeFunc(ctx context.Context, args interface{}) (Message, error) {
if err != nil {
return nil, err
}
return messagev1.NewMessage(b, messagev1.WithMetadata(md)), nil
return NewMessage(b, MessageOptionWithMetadata(md)), nil
}

func DefaultDecodeFunc(ctx context.Context, message Message, args interface{}) error {
Expand Down
46 changes: 24 additions & 22 deletions mq/message/v1/message.go → mq/message.v1.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,35 @@
package v1
package mq

import (
"errors"
"fmt"
"hash/crc32"
"time"

"github.com/Ccheers/kratos-mq/mq"
messagev1 "github.com/Ccheers/kratos-mq/mq/message/v1"
"github.com/go-kratos/kratos/v2/metadata"
"google.golang.org/protobuf/proto"
)

type MessageV1 messagev1.Message

var (
ErrMessageIsNil = errors.New("message is nil show alloc memory")
ErrMessageIsInvalid = errors.New("message is invalid")
)

type MessageOption func(x *Message)
type MessageOption func(x *MessageV1)

func WithMetadata(md metadata.Metadata) MessageOption {
return func(x *Message) {
func MessageOptionWithMetadata(md metadata.Metadata) MessageOption {
return func(x *MessageV1) {
x.Md = md
}
}

var _ mq.Message = (*Message)(nil)
var _ Message = (*MessageV1)(nil)

func NewMessage(payload mq.Payload, opts ...MessageOption) mq.Message {
m := &Message{Data: payload}
func NewMessage(payload Payload, opts ...MessageOption) Message {
m := &MessageV1{Data: payload}
// 校验和计算
defer m.assignValidSum()
for _, opt := range opts {
Expand All @@ -40,66 +42,66 @@ func NewMessage(payload mq.Payload, opts ...MessageOption) mq.Message {
return m
}

func NewMessageFromByte(b []byte) (mq.Message, error) {
m := &Message{}
func NewMessageFromByte(b []byte) (Message, error) {
m := &MessageV1{}
err := m.UnMarshal(b)
if err != nil {
return nil, err
}
return m, nil
}

func (x *Message) Metadata() metadata.Metadata {
func (x *MessageV1) Metadata() metadata.Metadata {
return x.Md
}

func (x *Message) Payload() mq.Payload {
func (x *MessageV1) Payload() Payload {
return x.Data
}

func (x *Message) Err() error {
func (x *MessageV1) Err() error {
return fmt.Errorf(x.Error)
}

func (x *Message) UniKey() string {
func (x *MessageV1) UniKey() string {
return x.Key
}

func (x *Message) Check() error {
func (x *MessageV1) Check() error {
vs := x.generateValidSum()
if x.ValidSum != vs {
return ErrMessageIsInvalid
}
return nil
}

func (x *Message) Marshal() ([]byte, error) {
return proto.Marshal(x)
func (x *MessageV1) Marshal() ([]byte, error) {
return proto.Marshal((*messagev1.Message)(x))
}

func (x *Message) UnMarshal(bytes []byte) error {
func (x *MessageV1) UnMarshal(bytes []byte) error {
if x == nil {
return ErrMessageIsNil
}
return proto.Unmarshal(bytes, x)
return proto.Unmarshal(bytes, (*messagev1.Message)(x))
}

// ------------------------------ private ------------------------------

func (x *Message) generateUniKey() string {
func (x *MessageV1) generateUniKey() string {
b, _ := x.Marshal()
c32ID := crc32.ChecksumIEEE(b)
return fmt.Sprintf("%d-%d", time.Now().Unix(), c32ID)
}

func (x *Message) generateValidSum() uint32 {
func (x *MessageV1) generateValidSum() uint32 {
m := *x
m.ValidSum = 0
b, _ := m.Marshal()
return crc32.ChecksumIEEE(b)
}

func (x *Message) assignValidSum() {
func (x *MessageV1) assignValidSum() {
x.ValidSum = 0
x.ValidSum = x.generateValidSum()
}
21 changes: 17 additions & 4 deletions mq/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"time"

"github.com/ccheers/xpkg/sync/routinepool"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware"
"github.com/go-kratos/kratos/v2/middleware/recovery"
)

type serverOptions struct {
Expand Down Expand Up @@ -35,9 +37,19 @@ type Server struct {

func NewServer(consumer Consumer, opts ...ServerOptionFunc) *Server {
options := &serverOptions{
timeout: time.Second * 3,
decodeFunc: DefaultDecodeFunc,
errHandler: ErrorHandlerFunc(DefaultErrorHandler),
timeout: time.Second * 3,
decodeFunc: DefaultDecodeFunc,
errHandler: ErrorHandlerFunc(DefaultErrorHandler),
cap: 4,
scaleThreshold: 4,
ms: []middleware.Middleware{
recovery.Recovery(
recovery.WithHandler(func(ctx context.Context, req, err interface{}) error {
log.Errorf("panic recover err=%v req=%+v", err, req)
return nil
}),
),
},
}
for _, f := range opts {
f(options)
Expand All @@ -46,7 +58,7 @@ func NewServer(consumer Consumer, opts ...ServerOptionFunc) *Server {
ScaleThreshold: options.scaleThreshold,
})
pool.SetPanicHandler(func(ctx context.Context, err error) {
options.errHandler(err)
options.errHandler.Handle(err)
})
return &Server{
consumer: consumer,
Expand Down Expand Up @@ -77,6 +89,7 @@ func (x *Server) Subscriber(ctx context.Context, topic string, channel string, h
for {
select {
case <-ctx.Done():
log.Warnf("routine exit context canceled topic=%s, channel=%s", topic, channel)
return
case msg = <-ch:
ctx, cancel := context.WithTimeout(ctx, x.options.timeout)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions mq_impl/nsq/config/config.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";
package pkg.mq.nsq.config;

option go_package = "github.com/Ccheers/kratos-mq/mq_impl/nsq/config;config";

message Config {
repeated string nsq_addrs = 1; // nsq 服务地址
uint32 dial_timeout = 2; // 连接超时
uint32 read_timeout = 3; // 读超时
uint32 write_timeout = 4; // 写超时
uint32 batch_size = 5; // 一次性拉取的消息条书
uint32 heartbeat_interval = 6; // 一次性拉取的消息条书
}
File renamed without changes.
14 changes: 7 additions & 7 deletions internal/nsq/consumer.go → mq_impl/nsq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"fmt"
"sync"

"github.com/Ccheers/kratos-mq/internal/nsq/config"
"github.com/Ccheers/kratos-mq/mq"
messagev1 "github.com/Ccheers/kratos-mq/mq/message/v1"
"github.com/Ccheers/kratos-mq/mq_impl/nsq/config"
"github.com/go-kratos/kratos/v2/log"
"github.com/nsqio/go-nsq"
)
Expand Down Expand Up @@ -52,13 +51,9 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri
}
consumer.SetLoggerForLevel(x.logger, nsq.LogLevelInfo)

err = consumer.ConnectToNSQDs(x.nsqAddrs)
if err != nil {
return nil, err
}
ch := make(chan mq.Message, 1)
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
msg, err := messagev1.NewMessageFromByte(message.Body)
msg, err := mq.NewMessageFromByte(message.Body)
if err != nil {
return err
}
Expand All @@ -67,6 +62,11 @@ func (x *ConsumerImpl) Subscribe(ctx context.Context, topic string, channel stri
return nil
}))

err = consumer.ConnectToNSQDs(x.nsqAddrs)
if err != nil {
return nil, err
}

x.consumerMap[uniKey] = consumer
x.consumerChan[uniKey] = ch
return ch, nil
Expand Down
Loading

0 comments on commit 5f2e6a7

Please sign in to comment.