diff --git a/couchbase/document.go b/couchbase/document.go index e1460d3..2f4cb19 100644 --- a/couchbase/document.go +++ b/couchbase/document.go @@ -14,12 +14,14 @@ type CBActionDocument struct { Source []byte ID []byte Path []byte + Size int } func NewDeleteAction(key []byte) CBActionDocument { return CBActionDocument{ ID: key, Type: Delete, + Size: len(key), } } @@ -28,6 +30,7 @@ func NewSetAction(key []byte, source []byte) CBActionDocument { ID: key, Source: source, Type: Set, + Size: len(key) + len(source), } } @@ -37,6 +40,7 @@ func NewMutateInAction(key []byte, path []byte, source []byte) CBActionDocument Source: source, Type: MutateIn, Path: path, + Size: len(key) + len(path) + len(source), } } @@ -45,5 +49,6 @@ func NewDeletePathAction(key []byte, path []byte) CBActionDocument { ID: key, Type: DeletePath, Path: path, + Size: len(key) + len(path), } } diff --git a/couchbase/processor.go b/couchbase/processor.go index 31662f2..19711c9 100644 --- a/couchbase/processor.go +++ b/couchbase/processor.go @@ -29,6 +29,7 @@ type Processor struct { requestTimeout time.Duration batchTickerDuration time.Duration batchByteSizeLimit int + batchByteSize int batchSizeLimit int batchSize int flushLock sync.Mutex @@ -38,6 +39,8 @@ type Processor struct { type Metric struct { ProcessLatencyMs int64 BulkRequestProcessLatencyMs int64 + BulkRequestSize int64 + BulkRequestByteSize int64 } func NewProcessor( @@ -88,6 +91,7 @@ func (b *Processor) flushMessages() { b.batchTicker.Reset(b.batchTickerDuration) b.batch = b.batch[:0] b.batchSize = 0 + b.batchByteSize = 0 } b.dcpCheckpointCommit() @@ -100,6 +104,7 @@ func (b *Processor) PrepareStartRebalancing() { b.isDcpRebalancing = true b.batch = b.batch[:0] b.batchSize = 0 + b.batchByteSize = 0 } func (b *Processor) PrepareEndRebalancing() { @@ -118,6 +123,9 @@ func (b *Processor) AddActions( b.flushLock.Lock() b.batch = append(b.batch, actions...) b.batchSize += len(actions) + for _, action := range actions { + b.batchByteSize += action.Size + } if isLastChunk { ctx.Ack() } @@ -126,7 +134,7 @@ func (b *Processor) AddActions( if isLastChunk { b.metric.ProcessLatencyMs = time.Since(eventTime).Milliseconds() } - if b.batchSize >= b.batchSizeLimit || len(b.batch) >= b.batchByteSizeLimit { + if b.batchSize >= b.batchSizeLimit || b.batchByteSize >= b.batchByteSizeLimit { b.flushMessages() } } @@ -222,4 +230,6 @@ func (b *Processor) bulkRequest() { wg.Wait() b.metric.BulkRequestProcessLatencyMs = time.Since(startedTime).Milliseconds() + b.metric.BulkRequestSize = int64(b.batchSize) + b.metric.BulkRequestByteSize = int64(b.batchByteSize) } diff --git a/metric/collector.go b/metric/collector.go index d39ca07..18f3b44 100644 --- a/metric/collector.go +++ b/metric/collector.go @@ -11,6 +11,8 @@ type Collector struct { processLatency *prometheus.Desc bulkRequestProcessLatency *prometheus.Desc + bulkRequestSize *prometheus.Desc + bulkRequestByteSize *prometheus.Desc } func (s *Collector) Describe(ch chan<- *prometheus.Desc) { @@ -33,6 +35,20 @@ func (s *Collector) Collect(ch chan<- prometheus.Metric) { float64(processorMetric.BulkRequestProcessLatencyMs), []string{}..., ) + + ch <- prometheus.MustNewConstMetric( + s.bulkRequestSize, + prometheus.GaugeValue, + float64(processorMetric.BulkRequestSize), + []string{}..., + ) + + ch <- prometheus.MustNewConstMetric( + s.bulkRequestByteSize, + prometheus.GaugeValue, + float64(processorMetric.BulkRequestByteSize), + []string{}..., + ) } func NewMetricCollector(processor *couchbase.Processor) *Collector { @@ -52,5 +68,19 @@ func NewMetricCollector(processor *couchbase.Processor) *Collector { []string{}, nil, ), + + bulkRequestSize: prometheus.NewDesc( + prometheus.BuildFQName(helpers.Name, "couchbase_connector_bulk_request_size", "current"), + "Couchbase connector bulk request size", + []string{}, + nil, + ), + + bulkRequestByteSize: prometheus.NewDesc( + prometheus.BuildFQName(helpers.Name, "couchbase_connector_bulk_request_byte_size", "current"), + "Couchbase connector bulk request byte size", + []string{}, + nil, + ), } }