Skip to content

Commit

Permalink
Integrate allow auto topic creation config to Kafka client (#63)
Browse files Browse the repository at this point in the history
* Integrate allow auto topic creation config to Kafka client

* Avoid checking topic existing while allow auto topic creation true
  • Loading branch information
mhmtszr authored Sep 18, 2023
1 parent f3f4309 commit 75ffcb6
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 16 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)
| `kafka.metadataTTL` | time.Duration | no | 60s | TTL for the metadata cached by segmentio, increase it to reduce network requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTTL). |
| `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). |
| `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). |
| `kafka.allowAutoTopicCreation` | bool | no | false | Create topic if missing. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.AllowAutoTopicCreation). |

### Kafka Metadata Configuration(Use it if you want to store the checkpoint data in Kafka)

Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Kafka struct {
ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"`
Compression int8 `yaml:"compression"`
SecureConnection bool `yaml:"secureConnection"`
AllowAutoTopicCreation bool `yaml:"allowAutoTopicCreation"`
}

func (k *Kafka) GetCompression() int8 {
Expand Down
9 changes: 6 additions & 3 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,13 @@ func createKafkaClient(cc *config.Connector, connector *connector) (kafka.Client
topics = append(topics, topic)
}

if err := kafkaClient.CheckTopics(topics); err != nil {
connector.errorLogger.Printf("collection topic mapping error: %v", err)
return nil, err
if !cc.Kafka.AllowAutoTopicCreation {
if err := kafkaClient.CheckTopics(topics); err != nil {
connector.errorLogger.Printf("collection topic mapping error: %v", err)
return nil, err
}
}

return kafkaClient, nil
}

Expand Down
27 changes: 14 additions & 13 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,20 @@ func (c *client) CheckTopics(topics []string) error {

func (c *client) Producer() *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(c.config.Kafka.Brokers...),
Balancer: &kafka.Hash{},
BatchSize: c.config.Kafka.ProducerBatchSize,
BatchBytes: math.MaxInt,
BatchTimeout: 500 * time.Microsecond,
MaxAttempts: math.MaxInt,
ReadTimeout: c.config.Kafka.ReadTimeout,
WriteTimeout: c.config.Kafka.WriteTimeout,
RequiredAcks: kafka.RequiredAcks(c.config.Kafka.RequiredAcks),
Logger: c.logger,
ErrorLogger: c.errorLogger,
Compression: kafka.Compression(c.config.Kafka.GetCompression()),
Transport: c.transport,
Addr: kafka.TCP(c.config.Kafka.Brokers...),
Balancer: &kafka.Hash{},
BatchSize: c.config.Kafka.ProducerBatchSize,
BatchBytes: math.MaxInt,
BatchTimeout: 500 * time.Microsecond,
MaxAttempts: math.MaxInt,
ReadTimeout: c.config.Kafka.ReadTimeout,
WriteTimeout: c.config.Kafka.WriteTimeout,
RequiredAcks: kafka.RequiredAcks(c.config.Kafka.RequiredAcks),
Logger: c.logger,
ErrorLogger: c.errorLogger,
Compression: kafka.Compression(c.config.Kafka.GetCompression()),
Transport: c.transport,
AllowAutoTopicCreation: c.config.Kafka.AllowAutoTopicCreation,
}
}

Expand Down

0 comments on commit 75ffcb6

Please sign in to comment.