Skip to content

Commit

Permalink
feat: expose bulk request size and bulk request byte size
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed May 2, 2024
1 parent d38e0e4 commit 43faa37
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
5 changes: 5 additions & 0 deletions couchbase/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ type CBActionDocument struct {
Source []byte
ID []byte
Path []byte
Size int
}

func NewDeleteAction(key []byte) CBActionDocument {
return CBActionDocument{
ID: key,
Type: Delete,
Size: len(key),
}
}

Expand All @@ -28,6 +30,7 @@ func NewSetAction(key []byte, source []byte) CBActionDocument {
ID: key,
Source: source,
Type: Set,
Size: len(key) + len(source),
}
}

Expand All @@ -37,6 +40,7 @@ func NewMutateInAction(key []byte, path []byte, source []byte) CBActionDocument
Source: source,
Type: MutateIn,
Path: path,
Size: len(key) + len(path) + len(source),
}
}

Expand All @@ -45,5 +49,6 @@ func NewDeletePathAction(key []byte, path []byte) CBActionDocument {
ID: key,
Type: DeletePath,
Path: path,
Size: len(key) + len(path),
}
}
12 changes: 11 additions & 1 deletion couchbase/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Processor struct {
requestTimeout time.Duration
batchTickerDuration time.Duration
batchByteSizeLimit int
batchByteSize int
batchSizeLimit int
batchSize int
flushLock sync.Mutex
Expand All @@ -38,6 +39,8 @@ type Processor struct {
type Metric struct {
ProcessLatencyMs int64
BulkRequestProcessLatencyMs int64
BulkRequestSize int64
BulkRequestByteSize int64
}

func NewProcessor(
Expand Down Expand Up @@ -88,6 +91,7 @@ func (b *Processor) flushMessages() {
b.batchTicker.Reset(b.batchTickerDuration)
b.batch = b.batch[:0]
b.batchSize = 0
b.batchByteSize = 0
}

b.dcpCheckpointCommit()
Expand All @@ -100,6 +104,7 @@ func (b *Processor) PrepareStartRebalancing() {
b.isDcpRebalancing = true
b.batch = b.batch[:0]
b.batchSize = 0
b.batchByteSize = 0
}

func (b *Processor) PrepareEndRebalancing() {
Expand All @@ -118,6 +123,9 @@ func (b *Processor) AddActions(
b.flushLock.Lock()
b.batch = append(b.batch, actions...)
b.batchSize += len(actions)
for _, action := range actions {
b.batchByteSize += action.Size
}
if isLastChunk {
ctx.Ack()
}
Expand All @@ -126,7 +134,7 @@ func (b *Processor) AddActions(
if isLastChunk {
b.metric.ProcessLatencyMs = time.Since(eventTime).Milliseconds()
}
if b.batchSize >= b.batchSizeLimit || len(b.batch) >= b.batchByteSizeLimit {
if b.batchSize >= b.batchSizeLimit || b.batchByteSize >= b.batchByteSizeLimit {
b.flushMessages()
}
}
Expand Down Expand Up @@ -222,4 +230,6 @@ func (b *Processor) bulkRequest() {

wg.Wait()
b.metric.BulkRequestProcessLatencyMs = time.Since(startedTime).Milliseconds()
b.metric.BulkRequestSize = int64(b.batchSize)
b.metric.BulkRequestByteSize = int64(b.batchByteSize)
}
30 changes: 30 additions & 0 deletions metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type Collector struct {

processLatency *prometheus.Desc
bulkRequestProcessLatency *prometheus.Desc
bulkRequestSize *prometheus.Desc
bulkRequestByteSize *prometheus.Desc
}

func (s *Collector) Describe(ch chan<- *prometheus.Desc) {
Expand All @@ -33,6 +35,20 @@ func (s *Collector) Collect(ch chan<- prometheus.Metric) {
float64(processorMetric.BulkRequestProcessLatencyMs),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.bulkRequestSize,
prometheus.GaugeValue,
float64(processorMetric.BulkRequestSize),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.bulkRequestByteSize,
prometheus.GaugeValue,
float64(processorMetric.BulkRequestByteSize),
[]string{}...,
)
}

func NewMetricCollector(processor *couchbase.Processor) *Collector {
Expand All @@ -52,5 +68,19 @@ func NewMetricCollector(processor *couchbase.Processor) *Collector {
[]string{},
nil,
),

bulkRequestSize: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "couchbase_connector_bulk_request_size", "current"),
"Couchbase connector bulk request size",
[]string{},
nil,
),

bulkRequestByteSize: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "couchbase_connector_bulk_request_byte_size", "current"),
"Couchbase connector bulk request byte size",
[]string{},
nil,
),
}
}

0 comments on commit 43faa37

Please sign in to comment.