Skip to content

Commit

Permalink
feat: logging implemented with go-dcp logger (#3)
Browse files Browse the repository at this point in the history
* feat: logging implemented with go-dcp logger
  • Loading branch information
ramazan authored Sep 27, 2023
1 parent d8a7cfd commit 82f11fb
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 117 deletions.
41 changes: 22 additions & 19 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package dcpcouchbase
import (
"errors"

"github.com/Trendyol/go-dcp-couchbase/metric"
"github.com/sirupsen/logrus"

"github.com/Trendyol/go-dcp-couchbase/config"
"github.com/Trendyol/go-dcp-couchbase/couchbase"
"github.com/Trendyol/go-dcp-couchbase/metric"

"github.com/Trendyol/go-dcp"
dcpClientConfig "github.com/Trendyol/go-dcp/config"
Expand All @@ -20,12 +21,10 @@ type Connector interface {
}

type connector struct {
dcp dcp.Dcp
config *config.Config
mapper Mapper
logger logger.Logger
errorLogger logger.Logger
processor *couchbase.Processor
dcp dcp.Dcp
config *config.Config
mapper Mapper
processor *couchbase.Processor
}

func (c *connector) Start() {
Expand Down Expand Up @@ -64,38 +63,36 @@ func (c *connector) listener(ctx *models.ListenerContext) {
c.processor.AddActions(ctx, e.EventTime, actions)
}

func createDcp(cfg any, listener models.Listener, logger logger.Logger, errorLogger logger.Logger) (dcp.Dcp, error) {
func createDcp(cfg any, listener models.Listener) (dcp.Dcp, error) {
switch v := cfg.(type) {
case dcpClientConfig.Dcp:
return dcp.NewDcpWithLoggers(v, listener, logger, errorLogger)
return dcp.NewDcp(v, listener)
case string:
return dcp.NewDcpWithLoggers(v, listener, logger, errorLogger)
return dcp.NewDcp(v, listener)
default:
return nil, errors.New("invalid config")
}
}

func NewConnector(cf any, mapper Mapper, logger logger.Logger, errorLogger logger.Logger) (Connector, error) {
func NewConnector(cf any, mapper Mapper) (Connector, error) {
cfg, err := newConfig(cf)
if err != nil {
return nil, err
}
cfg.ApplyDefaults()

connector := &connector{
mapper: mapper,
config: cfg,
logger: logger,
errorLogger: errorLogger,
mapper: mapper,
config: cfg,
}

if err != nil {
return nil, err
}

dcp, err := createDcp(cfg.Dcp, connector.listener, logger, errorLogger)
dcp, err := createDcp(cfg.Dcp, connector.listener)
if err != nil {
connector.errorLogger.Printf("dcp error: %v", err)
logger.Log.Error("dcp error: %v", err)
return nil, err
}

Expand All @@ -105,8 +102,6 @@ func NewConnector(cf any, mapper Mapper, logger logger.Logger, errorLogger logge
connector.dcp = dcp
processor, err := couchbase.NewProcessor(
cfg,
connector.logger,
connector.errorLogger,
dcp.Commit,
)
if err != nil {
Expand All @@ -128,6 +123,14 @@ func NewConnector(cf any, mapper Mapper, logger logger.Logger, errorLogger logge
return connector, nil
}

func NewConnectorWithLogger(cf any, mapper Mapper, logrus *logrus.Logger) (Connector, error) {
logger.Log = &logger.Loggers{
Logrus: logrus,
}

return NewConnector(cf, mapper)
}

func newConfig(cf any) (*config.Config, error) {
switch v := cf.(type) {
case *config.Config:
Expand Down
4 changes: 2 additions & 2 deletions couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *client) Connect() error {

s.agent = agent

logger.Log.Printf("connected to %s, bucket: %s", s.config.Hosts, s.config.BucketName)
logger.Log.Info("connected to %s, bucket: %s", s.config.Hosts, s.config.BucketName)
return nil
}

Expand Down Expand Up @@ -170,7 +170,7 @@ func (s *client) DeletePath(ctx context.Context,

func (s *client) Close() {
_ = s.agent.Close()
logger.Log.Printf("connections closed %s", s.config.Hosts)
logger.Log.Info("connections closed %s", s.config.Hosts)
}

func NewClient(config *config.Couchbase) Client {
Expand Down
8 changes: 1 addition & 7 deletions couchbase/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ type Processor struct {
client Client
metric *Metric
agent *gocbcore.Agent
logger logger.Logger
errorLogger logger.Logger
batchTicker *time.Ticker
dcpCheckpointCommit func()
scopeName string
Expand All @@ -41,8 +39,6 @@ type Metric struct {
}

func NewProcessor(config *config.Config,
logger logger.Logger,
errorLogger logger.Logger,
dcpCheckpointCommit func(),
) (*Processor, error) {
client := NewClient(&config.Couchbase)
Expand All @@ -62,8 +58,6 @@ func NewProcessor(config *config.Config,
scopeName: config.Couchbase.ScopeName,
collectionName: config.Couchbase.CollectionName,
dcpCheckpointCommit: dcpCheckpointCommit,
logger: logger,
errorLogger: errorLogger,
metric: &Metric{},
}

Expand Down Expand Up @@ -186,7 +180,7 @@ func (b *Processor) bulkRequest() {
panicOrGo(err, &wg)
})
default:
b.errorLogger.Printf("Unexpected action type: %v", v.Type)
logger.Log.Error("Unexpected action type: %v", v.Type)
}

if err != nil {
Expand Down
12 changes: 5 additions & 7 deletions example/struct-config/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ go 1.20
replace github.com/Trendyol/go-dcp-couchbase => ./../..

require (
github.com/Trendyol/go-dcp v1.0.0
github.com/Trendyol/go-dcp-couchbase v0.0.0
github.com/Trendyol/go-dcp v1.1.0
github.com/Trendyol/go-dcp-couchbase v1.0.0
)

require (
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down Expand Up @@ -39,18 +38,16 @@ require (
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mhmtszr/concurrent-swiss-map v0.0.9 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/philhofer/fwd v1.1.2 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94 // indirect
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
github.com/tinylib/msgp v1.1.8 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.48.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
Expand All @@ -62,6 +59,7 @@ require (
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
Loading

0 comments on commit 82f11fb

Please sign in to comment.