Skip to content

Commit

Permalink
fix: message header nil pointer error for rejection log sink response…
Browse files Browse the repository at this point in the history
… handler
  • Loading branch information
ademekici committed Dec 16, 2024
1 parent 4915946 commit 5e45486
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 7 deletions.
9 changes: 3 additions & 6 deletions kafka/rejection_log_sink_response_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/Trendyol/go-dcp-kafka/config"
"github.com/Trendyol/go-dcp-kafka/kafka/message"
"github.com/Trendyol/go-dcp/logger"
jsoniter "github.com/json-iterator/go"
"github.com/segmentio/kafka-go"
Expand All @@ -14,14 +13,12 @@ import (
type RejectionLogSinkResponseHandler struct {
Config config.Kafka
KafkaClient Client
Message *message.KafkaMessage
Topic string
}

func (r *RejectionLogSinkResponseHandler) OnInit(ctx *SinkResponseHandlerInitContext) {
r.Config = ctx.Config
r.KafkaClient = ctx.KafkaClient
r.Message = ctx.Message
r.Topic = ctx.Config.RejectionLog.Topic

err := r.KafkaClient.CheckTopics([]string{r.Topic})
Expand All @@ -36,7 +33,7 @@ func (r *RejectionLogSinkResponseHandler) OnSuccess(_ *SinkResponseHandlerContex

func (r *RejectionLogSinkResponseHandler) OnError(ctx *SinkResponseHandlerContext) {
rejectionLog := r.buildRejectionLog(ctx)
if err := r.publishToKafka(rejectionLog); err != nil {
if err := r.publishToKafka(ctx, rejectionLog); err != nil {
logger.Log.Error("failed to publish rejection log, err: %v", err)
panic(err)
}
Expand All @@ -58,7 +55,7 @@ func (r *RejectionLogSinkResponseHandler) buildRejectionLog(ctx *SinkResponseHan
return rejectionLog
}

func (r *RejectionLogSinkResponseHandler) publishToKafka(rejectionLog RejectionLog) error {
func (r *RejectionLogSinkResponseHandler) publishToKafka(ctx *SinkResponseHandlerContext, rejectionLog RejectionLog) error {
writer := r.KafkaClient.Producer()
defer func() {
if err := writer.Close(); err != nil {
Expand All @@ -75,7 +72,7 @@ func (r *RejectionLogSinkResponseHandler) publishToKafka(rejectionLog RejectionL
Topic: r.Topic,
Key: rejectionLog.Key,
Value: logBytes,
Headers: r.Message.Headers,
Headers: ctx.Message.Headers,
}

if err := writer.WriteMessages(context.Background(), kafkaMessage); err != nil {
Expand Down
1 change: 0 additions & 1 deletion kafka/sink_response_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ type SinkResponseHandlerContext struct {
}

type SinkResponseHandlerInitContext struct {
SinkResponseHandlerContext
KafkaClient Client
Config config.Kafka
}
Expand Down

0 comments on commit 5e45486

Please sign in to comment.