Skip to content

Commit

Permalink
feat: add rejection log sink handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ademekici committed Dec 16, 2024
1 parent 379756d commit f3ccc8c
Show file tree
Hide file tree
Showing 12 changed files with 622 additions and 48 deletions.
98 changes: 50 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,51 +38,51 @@ used for both connectors.

```go
func mapper(event couchbase.Event) []message.KafkaMessage {
// return nil if you wish to discard the event
return []message.KafkaMessage{
{
Headers: nil,
Key: event.Key,
Value: event.Value,
},
}
// return nil if you wish to discard the event
return []message.KafkaMessage{
{
Headers: nil,
Key: event.Key,
Value: event.Value,
},
}
}

func main() {
c, err := dcpkafka.NewConnector(&config.Connector{
Dcp: dcpConfig.Dcp{
Hosts: []string{"localhost:8091"},
Username: "user",
Password: "password",
BucketName: "dcp-test",
Dcp: dcpConfig.ExternalDcp{
Group: dcpConfig.DCPGroup{
Name: "groupName",
Membership: dcpConfig.DCPGroupMembership{
RebalanceDelay: 3 * time.Second,
},
},
},
Metadata: dcpConfig.Metadata{
Config: map[string]string{
"bucket": "checkpoint-bucket-name",
"scope": "_default",
"collection": "_default",
},
Type: "couchbase",
},
Debug: true},
Kafka: config.Kafka{
CollectionTopicMapping: map[string]string{"_default": "topic"},
Brokers: []string{"localhost:9092"},
},
}, mapper)
if err != nil {
panic(err)
}

defer c.Close()
c.Start()
c, err := dcpkafka.NewConnector(&config.Connector{
Dcp: dcpConfig.Dcp{
Hosts: []string{"localhost:8091"},
Username: "user",
Password: "password",
BucketName: "dcp-test",
Dcp: dcpConfig.ExternalDcp{
Group: dcpConfig.DCPGroup{
Name: "groupName",
Membership: dcpConfig.DCPGroupMembership{
RebalanceDelay: 3 * time.Second,
},
},
},
Metadata: dcpConfig.Metadata{
Config: map[string]string{
"bucket": "checkpoint-bucket-name",
"scope": "_default",
"collection": "_default",
},
Type: "couchbase",
},
Debug: true},
Kafka: config.Kafka{
CollectionTopicMapping: map[string]string{"_default": "topic"},
Brokers: []string{"localhost:9092"},
},
}, mapper)
if err != nil {
panic(err)
}

defer c.Close()
c.Start()
}
```

Expand Down Expand Up @@ -120,6 +120,8 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)
| `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). |
| `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). |
| `kafka.allowAutoTopicCreation` | bool | no | false | Create topic if missing. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.AllowAutoTopicCreation). |
| `kafka.rejectionLog.Topic` | string | no | | Rejection topic name. |
| `kafka.rejectionLog.IncludeValue` | boolean | no | false | Includes rejection log source info. `false` is default. |

### Kafka Metadata Configuration(Use it if you want to store the checkpoint data in Kafka)

Expand All @@ -131,19 +133,19 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)

## Exposed metrics

| Metric Name | Description | Labels | Value Type |
| Metric Name | Description | Labels | Value Type |
|-------------------------------------------------------|----------------------------------------|--------|------------|
| cbgo_kafka_connector_latency_ms_current | Time to adding to the batch. | N/A | Gauge |
| cbgo_kafka_connector_latency_ms_current | Time to adding to the batch. | N/A | Gauge |
| cbgo_kafka_connector_batch_produce_latency_ms_current | Time to produce messages in the batch. | N/A | Gauge |

You can also use all DCP-related metrics explained [here](https://github.com/Trendyol/go-dcp#exposed-metrics).
All DCP-related metrics are automatically injected. It means you don't need to do anything.
All DCP-related metrics are automatically injected. It means you don't need to do anything.

## Breaking Changes

| Date taking effect | Date announced | Change | How to check |
|--------------------| ---- |---- |-----------------|
| November 11, 2023 | November 11, 2023 | Creating connector via builder | Compile project |
| Date taking effect | Date announced | Change | How to check |
|--------------------|-------------------|--------------------------------|-----------------|
| November 11, 2023 | November 11, 2023 | Creating connector via builder | Compile project |

## Grafana Metric Dashboard

Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ type Kafka struct {
Compression int8 `yaml:"compression"`
SecureConnection bool `yaml:"secureConnection"`
AllowAutoTopicCreation bool `yaml:"allowAutoTopicCreation"`
RejectionLog RejectionLog `yaml:"rejectionLog"`
}

type RejectionLog struct {
Topic string `yaml:"topic"`
IncludeValue bool `yaml:"includeValue"`
}

func (k *Kafka) GetBalancer() kafka.Balancer {
Expand Down
22 changes: 22 additions & 0 deletions example/simple-rejection-log-sink-response-handler/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM golang:1.20-alpine as builder

WORKDIR /project

COPY go.mod go.sum ./
COPY main.go ./
COPY config.yml ./config.yml

RUN go mod download
RUN CGO_ENABLED=0 go build -a -o example main.go

FROM alpine:3.17.0

WORKDIR /app

RUN apk --no-cache add ca-certificates

USER nobody
COPY --from=builder --chown=nobody:nobody /project/example .
COPY --from=builder --chown=nobody:nobody /project/config.yml ./config.yml

ENTRYPOINT ["./example"]
35 changes: 35 additions & 0 deletions example/simple-rejection-log-sink-response-handler/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
hosts:
- localhost:8091
username: user
password: password
bucketName: dcp-test
scopeName: _default
collectionNames:
- _default
metadata:
type: couchbase
config:
bucket: checkpoint-bucket-name
scope: _default
collection: _default
dcp:
group:
name: groupName
membership:
type: static
kafka:
collectionTopicMapping:
_default: topicname
brokers:
- localhost:9092
# SSL configurations
#
# secureConnection: true
# Config support env variable "$HOME/example/..."
# rootCAPath: "example/stretch-kafka/rootCA.pem"
# interCAPath: "example/stretch-kafka/interCA.pem"
# scramUsername: "username"
# scramPassword: "password"
rejectionLog:
topic: "rejection-topic"
includeValue: true
75 changes: 75 additions & 0 deletions example/simple-rejection-log-sink-response-handler/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
module example

go 1.20

replace github.com/Trendyol/go-dcp-kafka => ./../..

require github.com/Trendyol/go-dcp-kafka v0.0.0

require (
github.com/Trendyol/go-dcp v1.2.0-rc.4 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.7.0 // indirect
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/couchbase/gocbcore/v10 v10.5.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gofiber/fiber/v2 v2.52.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mhmtszr/concurrent-swiss-map v1.0.8 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.58.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/segmentio/kafka-go v0.4.47 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.57.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/sync v0.9.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.29.4 // indirect
k8s.io/apimachinery v0.29.4 // indirect
k8s.io/client-go v0.29.4 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Loading

0 comments on commit f3ccc8c

Please sign in to comment.