diff --git a/config.sample.toml b/config.sample.toml index 4031379..7b65c58 100644 --- a/config.sample.toml +++ b/config.sample.toml @@ -15,10 +15,6 @@ source_topic2 = "target_topic2" [source_pool] # Kafka client config common to all upstream sources ([[sources]]). initial_offset = "start" -# Static memmbership to pin the member for the consumer group for respawn / reconnect and fence other members from connecting using the same id. -instance_id = "client_instance_id" -# Consumer group id. -group_id = "consumer_group" # Frequency at which source servers are polled for health/lag. healthcheck_interval = "3s" @@ -109,6 +105,7 @@ ca_cert_path = "" max_retries = -1 flush_batch_size = 1000 batch_size = 1000 +buffer_size = 100000 # channel buffer length max_message_bytes = 10000000 # Kafka exponential retry-backoff config for reconnection attempts. diff --git a/init.go b/init.go index bde3cf2..36adc17 100644 --- a/init.go +++ b/init.go @@ -92,14 +92,12 @@ func initSourcePoolConfig(ko *koanf.Koanf) relay.SourcePoolCfg { EnableBackoff: ko.Bool("source_pool.backoff_enable"), BackoffMin: ko.MustDuration("source_pool.backoff_min"), BackoffMax: ko.MustDuration("source_pool.backoff_max"), - GroupID: ko.MustString("source_pool.group_id"), - InstanceID: ko.MustString("source_pool.instance_id"), } } func initRelayConfig(ko *koanf.Koanf) relay.RelayCfg { return relay.RelayCfg{ - StopAtEnd: ko.Bool("stop_at_end"), + StopAtEnd: ko.Bool("stop-at-end"), } } @@ -175,10 +173,10 @@ func initTopicsMap(ko *koanf.Koanf) relay.Topics { } // initKafkaConfig reads the source(s)/target Kafka configuration. -func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerCfg) { +func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerCfg, relay.ProducerCfg) { // Read source Kafka config. src := struct { - Sources []relay.ConsumerGroupCfg `koanf:"sources"` + Sources []relay.ConsumerCfg `koanf:"sources"` }{} if err := ko.Unmarshal("", &src); err != nil { @@ -237,11 +235,11 @@ func initFilters(ko *koanf.Koanf, lo *slog.Logger) (map[string]filter.Provider, } var cfg filter.Config - if err := ko.Unmarshal("filter."+id, &cfg); err != nil { + if err := ko.Unmarshal("filters."+id, &cfg); err != nil { log.Fatalf("error unmarshalling filter config: %s: %v", id, err) } if cfg.Config == "" { - lo.Info(fmt.Sprintf("WARNING: No config 'filter.%s' for '%s' in config", id, id)) + lo.Info(fmt.Sprintf("WARNING: No config 'filters.%s' for '%s' in config", id, id)) } // Initialize the plugin. diff --git a/internal/relay/config.go b/internal/relay/config.go index 8014a06..3445463 100644 --- a/internal/relay/config.go +++ b/internal/relay/config.go @@ -42,8 +42,8 @@ type KafkaCfg struct { EnableLog bool `koanf:"enable_log"` } -// ConsumerGroupCfg is the consumer group specific config. -type ConsumerGroupCfg struct { +// ConsumerCfg is the direct consumer config. +type ConsumerCfg struct { KafkaCfg `koanf:",squash"` } @@ -57,6 +57,7 @@ type ProducerCfg struct { FlushFrequency time.Duration `koanf:"flush_frequency"` MaxMessageBytes int `koanf:"max_message_bytes"` BatchSize int `koanf:"batch_size"` + BufferSize int `koanf:"buffer_size"` FlushBatchSize int `koanf:"flush_batch_size"` Compression string `koanf:"compression"` // gzip|snappy|lz4|zstd|none } diff --git a/internal/relay/relay.go b/internal/relay/relay.go index 6363da5..81d0a06 100644 --- a/internal/relay/relay.go +++ b/internal/relay/relay.go @@ -12,7 +12,7 @@ import ( ) type RelayCfg struct { - StopAtEnd bool `koanf:"stop_at_end"` + StopAtEnd bool } // Relay represents the input, output kafka and the remapping necessary to forward messages from one topic to another. @@ -30,7 +30,7 @@ type Relay struct { // If stop-at-end is enabled, the "end" offsets of the source // read at the time of boot are cached here to compare against // live offsets and stop consumption. - targetOffsets map[string]map[int32]kgo.Offset + targetOffsets TopicOffsets // Live topic offsets from source. srcOffsets map[string]map[int32]int64 @@ -42,7 +42,7 @@ type Relay struct { func NewRelay(cfg RelayCfg, src *SourcePool, target *Target, topics Topics, filters map[string]filter.Provider, log *slog.Logger) (*Relay, error) { // If stop-at-end is set, fetch and cache the offsets to determine // when end is reached. - var offsets map[string]map[int32]kgo.Offset + var offsets TopicOffsets if cfg.StopAtEnd { if o, err := target.GetHighWatermark(); err != nil { return nil, err @@ -115,7 +115,7 @@ func (re *Relay) Start(globalCtx context.Context) error { go func() { defer wg.Done() // Wait till main ctx is cancelled. - <-globalCtx.Done() + <-ctx.Done() // Stop consumer group. re.source.Close() @@ -133,6 +133,7 @@ func (re *Relay) Start(globalCtx context.Context) error { // Close producer. re.target.Close() + cancel() wg.Wait() return nil @@ -223,7 +224,7 @@ loop: rec := iter.Next() // Always record the latest offsets before the messages are processed for new connections and // retries to consume from where it was left off. - // NOTE: What if the next step fails? The messages won't be read again? + // TODO: What if the next step fails? The messages won't be read again? re.source.RecordOffsets(rec) if err := re.processMessage(ctx, rec); err != nil { diff --git a/internal/relay/source_pool.go b/internal/relay/source_pool.go index 534dfdf..5f37274 100644 --- a/internal/relay/source_pool.go +++ b/internal/relay/source_pool.go @@ -22,15 +22,12 @@ type SourcePoolCfg struct { EnableBackoff bool BackoffMin time.Duration BackoffMax time.Duration - - GroupID string - InstanceID string } // Server represents a source Server's config with health and weight // parameters which are used for tracking health status. type Server struct { - Config ConsumerGroupCfg + Config ConsumerCfg ID int // Weight is the cumulative high watermark (offset) of every single topic @@ -47,15 +44,23 @@ type Server struct { Client *kgo.Client } +// TopicOffsets defines topic->partition->offset map for any src/target kafka cluster +type TopicOffsets map[string]map[int32]kgo.Offset + // SourcePool manages the source Kafka instances and consumption. type SourcePool struct { - cfg SourcePoolCfg - client *kgo.Client - log *slog.Logger - metrics *metrics.Set - topics []string - - offsets map[string]map[int32]kgo.Offset + cfg SourcePoolCfg + client *kgo.Client + log *slog.Logger + metrics *metrics.Set + targetToSrc map[string]string + srcTopics []string + + // targetOffsets is initialized with current topic high watermarks from target. + // These are updated whenever new msgs from src are sent to target for producing. + // Whenever a new direct src consumer starts consuming from respective topic it uses + // the offsets from this map. (These happen independently in the pool loop, hence no lock) + targetOffsets TopicOffsets // List of all source servers. servers []Server @@ -84,7 +89,7 @@ var ( // NewSourcePool returns a controller instance that manages the lifecycle of a pool of N source (consumer) // servers. The pool always attempts to find one healthy node for the relay to consume from. -func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics Topics, m *metrics.Set, log *slog.Logger) (*SourcePool, error) { +func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerCfg, topics Topics, targetOffsets TopicOffsets, m *metrics.Set, log *slog.Logger) (*SourcePool, error) { servers := make([]Server, 0, len(serverCfgs)) // Initially mark all servers as unhealthy. @@ -97,24 +102,32 @@ func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics Topi }) } - topicNames := make([]string, 0, len(topics)) - for t := range topics { - topicNames = append(topicNames, t) + var ( + targToSrc = make(map[string]string, len(topics)) + srcTopics = make([]string, 0, len(topics)) + ) + for src, targ := range topics { + srcTopics = append(srcTopics, src) + targToSrc[targ.TargetTopic] = src } - return &SourcePool{ - cfg: cfg, - topics: topicNames, - servers: servers, - log: log, - metrics: m, - backoffFn: getBackoffFn(cfg.EnableBackoff, cfg.BackoffMin, cfg.BackoffMax), - }, nil + sp := &SourcePool{ + cfg: cfg, + targetToSrc: targToSrc, + srcTopics: srcTopics, + servers: servers, + log: log, + metrics: m, + backoffFn: getBackoffFn(cfg.EnableBackoff, cfg.BackoffMin, cfg.BackoffMax), + } + + sp.setInitialOffsets(targetOffsets) + return sp, nil } -// SetInitialOffsets sets the offset/weight from the target on boot so that the messages +// setInitialOffsets sets the offset/weight from the target on boot so that the messages // can be consumed from the offsets where they were left off. -func (sp *SourcePool) SetInitialOffsets(of map[string]map[int32]kgo.Offset) { +func (sp *SourcePool) setInitialOffsets(of TopicOffsets) { // Assign the current weight as initial target offset. // This is done to resume if target already has messages published from src. var w int64 @@ -124,7 +137,7 @@ func (sp *SourcePool) SetInitialOffsets(of map[string]map[int32]kgo.Offset) { } } - sp.offsets = of + sp.targetOffsets = of // Set the current candidate with initial weight and a placeholder ID. This initial // weight ensures we resume consuming from where last left off. A real @@ -152,7 +165,8 @@ loop: } // Get the config for a healthy node. - if s, err := sp.getCurCandidate(); err == nil { + s, err := sp.getCurCandidate() + if err == nil { sp.log.Debug("attempting new source connection", "id", s.ID, "broker", s.Config.BootstrapBrokers, "retries", retries) conn, err := sp.newConn(globalCtx, s) if err != nil { @@ -163,7 +177,7 @@ loop: continue loop } - // Cache the current live connection internally. + // XXX: Cache the current live connection internally. sp.client = conn out := s @@ -175,7 +189,7 @@ loop: retries++ sp.metrics.GetOrCreateCounter(SrcsUnhealthyMetric).Inc() - sp.log.Error("no healthy server found. waiting and retrying", "retries", retries) + sp.log.Error("no healthy server found. waiting and retrying", "retries", retries, "error", err) waitTries(globalCtx, sp.backoffFn(retries)) } } @@ -210,24 +224,24 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) { // RecordOffsets records the offsets of the latest fetched records per topic. // This is used to resume consumption on new connections/reconnections from the source during runtime. func (sp *SourcePool) RecordOffsets(rec *kgo.Record) { - if sp.offsets == nil { - sp.offsets = make(map[string]map[int32]kgo.Offset) + if sp.targetOffsets == nil { + sp.targetOffsets = make(TopicOffsets) } - if o, ok := sp.offsets[rec.Topic]; ok { + if o, ok := sp.targetOffsets[rec.Topic]; ok { // If the topic already exists, update the offset for the partition. o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1) - sp.offsets[rec.Topic] = o + sp.targetOffsets[rec.Topic] = o } else { // If the topic does not exist, create a new map for the topic. o := make(map[int32]kgo.Offset) o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1) - sp.offsets[rec.Topic] = o + sp.targetOffsets[rec.Topic] = o } } func (sp *SourcePool) GetHighWatermark(ctx context.Context, cl *kgo.Client) (kadm.ListedOffsets, error) { - return getHighWatermark(ctx, cl, sp.topics, sp.cfg.ReqTimeout) + return getHighWatermark(ctx, cl, sp.srcTopics, sp.cfg.ReqTimeout) } // Close closes the active source Kafka client. @@ -246,27 +260,12 @@ func (sp *SourcePool) newConn(ctx context.Context, s Server) (*kgo.Client, error } sp.log.Debug("initiazing new source consumer", "id", s.ID, "server", s.Config.BootstrapBrokers) - cl, err := sp.initConsumerGroup(ctx, s.Config) + cl, err := sp.initConsumer(s.Config) if err != nil { sp.log.Error("error initiazing source consumer", "id", s.ID, "server", s.Config.BootstrapBrokers) return nil, err } - if sp.offsets != nil { - sp.log.Debug("resetting cached offsets", "id", s.ID, "server", s.Config.BootstrapBrokers, "offsets", sp.offsets) - if err := sp.leaveAndResetOffsets(ctx, cl, s); err != nil { - sp.log.Error("error resetting cached offsets", "id", s.ID, "server", s.Config.BootstrapBrokers, "error", err) - return nil, err - } - - sp.log.Debug("initiazing new source consumer after clearing offsets", "id", s.ID, "server", s.Config.BootstrapBrokers) - cl, err = sp.initConsumerGroup(ctx, s.Config) - if err != nil { - sp.log.Error("error initiazing source consumer after offset reset", "id", s.ID, "server", s.Config.BootstrapBrokers) - return nil, err - } - } - return cl, nil } @@ -380,33 +379,36 @@ func (sp *SourcePool) healthcheck(ctx context.Context, signal chan struct{}) err } } -// initConsumerGroup initializes a Kafka consumer group. This is used for creating consumer connection to source servers. -func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCfg) (*kgo.Client, error) { - assingedCtx, cancelFn := context.WithTimeout(ctx, cfg.SessionTimeout) - defer cancelFn() +var offsetPool = sync.Pool{ + New: func() interface{} { + return make(TopicOffsets) + }, +} - onAssigned := func(childCtx context.Context, cl *kgo.Client, claims map[string][]int32) { - select { - case <-ctx.Done(): - return - case <-childCtx.Done(): - return - default: - sp.log.Debug("partition assigned", "broker", cl.OptValue(kgo.SeedBrokers), "claims", claims) - cancelFn() +// initConsumer initializes a Kafka consumer client. This is used for creating consumer connection to source servers. +func (sp *SourcePool) initConsumer(cfg ConsumerCfg) (*kgo.Client, error) { + srcOffsets := offsetPool.Get().(TopicOffsets) + defer func() { + for k := range srcOffsets { + delete(srcOffsets, k) + } + offsetPool.Put(srcOffsets) + }() + + // For each target topic get the relevant src topic to configure direct + // consumer to start from target topic's last offset. + for t, of := range sp.targetOffsets { + src, ok := sp.targetToSrc[t] + if !ok { + return nil, fmt.Errorf("src topic not found for target %s in map", t) } + srcOffsets[src] = of } opts := []kgo.Opt{ + kgo.ConsumePartitions(srcOffsets), kgo.SeedBrokers(cfg.BootstrapBrokers...), kgo.FetchMaxWait(sp.cfg.ReqTimeout), - kgo.ConsumeTopics(sp.topics...), - kgo.ConsumerGroup(sp.cfg.GroupID), - kgo.InstanceID(sp.cfg.InstanceID), - kgo.SessionTimeout(cfg.SessionTimeout), - kgo.DisableAutoCommit(), - kgo.OnPartitionsAssigned(onAssigned), - kgo.BlockRebalanceOnPoll(), } if cfg.EnableLog { @@ -436,22 +438,15 @@ func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCf return nil, err } - if err := testConnection(cl, cfg.SessionTimeout, sp.topics, nil); err != nil { + if err := testConnection(cl, cfg.SessionTimeout, sp.srcTopics, nil); err != nil { return nil, err } - sp.log.Debug("waiting for source partition assignment", "server", cfg.BootstrapBrokers) - <-assingedCtx.Done() - if assingedCtx.Err() == context.DeadlineExceeded { - return nil, fmt.Errorf("timeout waiting for partition assingnment in target: %v: %w", cfg.BootstrapBrokers, assingedCtx.Err()) - } - sp.log.Debug("partition assigned", "server", cfg.BootstrapBrokers) - return cl, nil } // initConsumerClient returns franz-go client with default config. -func (sp *SourcePool) initConsumerClient(cfg ConsumerGroupCfg) (*kgo.Client, error) { +func (sp *SourcePool) initConsumerClient(cfg ConsumerCfg) (*kgo.Client, error) { opts := []kgo.Opt{ kgo.SeedBrokers(cfg.BootstrapBrokers...), kgo.FetchMaxWait(sp.cfg.ReqTimeout), @@ -531,117 +526,3 @@ func (sp *SourcePool) setWeight(id int, weight int64) { break } } - -// leaveAndResetOffsets leaves the current consumer group and resets its offset if given. -func (sp *SourcePool) leaveAndResetOffsets(ctx context.Context, cl *kgo.Client, s Server) error { - // leave group; mark the group as `Empty` before attempting to reset offsets. - sp.log.Debug("leaving group", "id", s.ID, "server", s.Config.BootstrapBrokers) - if err := sp.leaveGroup(ctx, cl); err != nil { - return err - } - - // Reset consumer group offsets using the existing offsets - if sp.offsets != nil { - sp.log.Debug("resetting offsets", "id", s.ID, "server", s.Config.BootstrapBrokers, "offsets", sp.offsets) - if err := sp.resetOffsets(ctx, cl, s); err != nil { - return err - } - } - - return nil -} - -// resetOffsets resets the consumer group with the given offsets map. -// Also waits for topics to catch up to the messages in case it is lagging behind. -func (sp *SourcePool) resetOffsets(ctx context.Context, cl *kgo.Client, s Server) error { - var ( - maxAttempts = -1 // TODO: make this configurable? - attempts = 0 - admCl = kadm.NewClient(cl) - ) - - // wait for topic lap to catch up -waitForTopicLag: - for { - select { - case <-ctx.Done(): - return nil - default: - if attempts >= maxAttempts && maxAttempts != IndefiniteRetry { - return fmt.Errorf("max attempts(%d) for fetching offsets", maxAttempts) - } - - // Get end offsets of the topics - topicOffsets, err := admCl.ListEndOffsets(ctx, sp.topics...) - if err != nil { - sp.log.Error("error fetching offsets", "err", err) - return err - } - - for t, po := range sp.offsets { - for p, o := range po { - eO, ok := topicOffsets.Lookup(t, p) - // TODO: - if !ok { - continue - } - - if o.EpochOffset().Offset > eO.Offset { - return fmt.Errorf("%w by %d msgs(s)", ErrLaggingBehind, o.EpochOffset().Offset-eO.Offset) - } - } - } - - break waitForTopicLag - } - } - - // force set consumer group offsets - of := make(kadm.Offsets) - for t, po := range sp.offsets { - oMap := make(map[int32]kgo.EpochOffset) - for p, o := range po { - oMap[p] = o.EpochOffset() - of.AddOffset(t, p, o.EpochOffset().Offset, -1) - } - } - - sp.log.Info("resetting offsets for consumer group", "id", s.ID, "server", s.Config.BootstrapBrokers, "offsets", of) - resp, err := admCl.CommitOffsets(ctx, sp.cfg.GroupID, of) - if err != nil { - return fmt.Errorf("error resetting group offset: %w", err) - } - - if err := resp.Error(); err != nil { - return fmt.Errorf("error resetting group offset: %w", err) - } - - // _ = resp - // // check for errors in offset responses - // for _, or := range resp { - // for _, r := range or { - // if r.Err != nil { - // l.Error("error resetting group offset", "err", r.Err) - // return err - // } - // } - // } - - return nil -} - -// leaveGroup makes the given client leave from a consumer group. -func (sp *SourcePool) leaveGroup(ctx context.Context, cl *kgo.Client) error { - l := kadm.LeaveGroup(sp.cfg.GroupID).Reason("resetting offsets").InstanceIDs(sp.cfg.InstanceID) - - resp, err := kadm.NewClient(cl).LeaveGroup(ctx, l) - if err != nil { - return err - } - - if err := resp.Error(); err != nil { - return err - } - - return nil -} diff --git a/internal/relay/target.go b/internal/relay/target.go index 16f2406..d74257e 100644 --- a/internal/relay/target.go +++ b/internal/relay/target.go @@ -49,7 +49,7 @@ func NewTarget(globalCtx context.Context, cfg TargetCfg, pCfg ProducerCfg, topic targetTopics: topics, batch: make([]*kgo.Record, 0, pCfg.BatchSize), - inletCh: make(chan *kgo.Record, pCfg.BatchSize*10), + inletCh: make(chan *kgo.Record, pCfg.BufferSize), } // Initialize the actual Kafka client. diff --git a/main.go b/main.go index 9b4270a..8aa1aea 100644 --- a/main.go +++ b/main.go @@ -72,13 +72,11 @@ func main() { } // Initialize the source Kafka (consumer) relay. - srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, metr, lo) + srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, hOf.KOffsets(), metr, lo) if err != nil { log.Fatalf("error initializing source pool controller: %v", err) } - srcPool.SetInitialOffsets(hOf.KOffsets()) - // Initialize the Relay which orchestrates consumption from the sourcePool // and writing to the target pool. relay, err := relay.NewRelay(initRelayConfig(ko), srcPool, target, topics, filters, lo)