Skip to content

Commit

Permalink
Merge #137003
Browse files Browse the repository at this point in the history
137003: cli: update CLI output for tsdump upload r=aa-joshi a=aa-joshi

Previously, we are logging the first metric from tsdump datadog upload request along with the error. The communication during failure was not very clear about whether upload is succeeded or failed. This patch improves communication based on upload state. User will be able to clearly differentiate between success, partial success & failure. In case of partial success, it will list all metrics which were failed during upload. Along with it, The patch updates the retries count & back off configuration for the upload.

Epic: None
Part of: CRDB-44835
Release note: None

- Upload partial success
[partial_success.txt](https://github.com/user-attachments/files/18059610/partial_success.txt)

- Upload failed 

<img width="1842" alt="upload_failed" src="https://github.com/user-attachments/assets/e6b6c6ab-b968-41a7-9656-a979ebcfee38">

-  Upload success

<img width="1905" alt="upload_success" src="https://github.com/user-attachments/assets/31d795d5-9c54-4049-b5f1-60ed476b3995">


Co-authored-by: Akshay Joshi <[email protected]>
  • Loading branch information
craig[bot] and aa-joshi committed Dec 11, 2024
2 parents d18eb68 + 355dd14 commit 5b9cad9
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 19 deletions.
4 changes: 2 additions & 2 deletions pkg/cli/tsdump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func parseDDInput(t *testing.T, input string, w *datadogWriter) {
(data != nil && data.Metric != metricName ||
(data != nil && source != nameValueTimestamp[1])) {
if data != nil {
err := w.emitDataDogMetrics([]DatadogSeries{*data})
_, err := w.emitDataDogMetrics([]DatadogSeries{*data})
require.NoError(t, err)
}
data = &DatadogSeries{
Expand All @@ -182,7 +182,7 @@ func parseDDInput(t *testing.T, input string, w *datadogWriter) {
Timestamp: ts,
})
}
err := w.emitDataDogMetrics([]DatadogSeries{*data})
_, err := w.emitDataDogMetrics([]DatadogSeries{*data})
require.NoError(t, err)
}

Expand Down
76 changes: 59 additions & 17 deletions pkg/cli/tsdump_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const (
DatadogSeriesTypeRate
DatadogSeriesTypeGauge
)
const (
UploadStatusSuccess = "Success"
UploadStatusFailure = "Failed"
)

var (
// each site in datadog has a different host name. ddSiteToHostMap
Expand Down Expand Up @@ -207,12 +211,14 @@ func dump(kv *roachpb.KeyValue) (*DatadogSeries, error) {

var printLock syncutil.Mutex

func (d *datadogWriter) emitDataDogMetrics(data []DatadogSeries) error {
func (d *datadogWriter) emitDataDogMetrics(data []DatadogSeries) ([]string, error) {
tags := getUploadTags(d)

emittedMetrics := make([]string, len(data))
for i := 0; i < len(data); i++ {
data[i].Tags = append(data[i].Tags, tags...)
data[i].Metric = d.namePrefix + data[i].Metric
emittedMetrics[i] = data[i].Metric
}

// When running in init mode, we insert zeros with the current
Expand Down Expand Up @@ -242,7 +248,7 @@ func (d *datadogWriter) emitDataDogMetrics(data []DatadogSeries) error {
)
}()

return d.flush(data)
return emittedMetrics, d.flush(data)
}

func getUploadTags(d *datadogWriter) []string {
Expand Down Expand Up @@ -294,7 +300,8 @@ func (d *datadogWriter) flush(data []DatadogSeries) error {
}

retryOpts := base.DefaultRetryOptions()
retryOpts.MaxRetries = 5
retryOpts.MaxBackoff = 2 * time.Second
retryOpts.MaxRetries = 100
var req *http.Request
for retry := retry.Start(retryOpts); retry.Next(); {
req, err = http.NewRequest("POST", d.targetURL, &zipBuf)
Expand Down Expand Up @@ -344,12 +351,20 @@ func (d *datadogWriter) upload(fileName string) error {
var wg sync.WaitGroup
ch := make(chan []DatadogSeries, 4000)

// errorsInDDUpload wraps the error collection in a mutex since
// metricsUploadState wraps the failed metrics collection in a mutex since
// they're collected concurrently.
var errorsInDDUpload struct {
var metricsUploadState struct {
syncutil.Mutex
errors []string
// we are taking map here as we want unique metric names which were failed across all tsdump upload requests.
uploadFailedMetrics map[string]struct{}
isSingleUploadSucceeded bool
}
metricsUploadState.uploadFailedMetrics = make(map[string]struct{})
metricsUploadState.isSingleUploadSucceeded = false

markSuccessOnce := sync.OnceFunc(func() {
metricsUploadState.isSingleUploadSucceeded = true
})

// Note(davidh): This was previously set at 1000 and we'd get regular
// 400s from Datadog with the cryptic `Unable to decompress payload`
Expand All @@ -358,16 +373,19 @@ func (d *datadogWriter) upload(fileName string) error {
for i := 0; i < 10; i++ {
go func() {
for data := range ch {
err := d.emitDataDogMetrics(data)
emittedMetrics, err := d.emitDataDogMetrics(data)
if err != nil {
func() {
errorsInDDUpload.Lock()
defer errorsInDDUpload.Unlock()
errorsInDDUpload.errors = append(errorsInDDUpload.errors,
fmt.Sprintf("retries exhausted for datadog upload for series %s with error %v\n", data[0].Metric, err))
metricsUploadState.Lock()
defer metricsUploadState.Unlock()
for _, metric := range emittedMetrics {
metricsUploadState.uploadFailedMetrics[metric] = struct{}{}
}
}()
} else {
func() {
markSuccessOnce()
}()
wg.Done()
return
}
wg.Done()
}
Expand All @@ -394,11 +412,35 @@ func (d *datadogWriter) upload(fileName string) error {
year, month, day := d.uploadTime.Date()
dashboardLink := fmt.Sprintf(datadogDashboardURLFormat, debugTimeSeriesDumpOpts.clusterLabel, d.uploadID, day, int(month), year, fromUnixTimestamp, toUnixTimestamp)

if len(errorsInDDUpload.errors) != 0 {
fmt.Printf("\n%d upload errors occurred:\n%s\n", len(errorsInDDUpload.errors), strings.Join(errorsInDDUpload.errors, "\n"))
var uploadStatus string
if metricsUploadState.isSingleUploadSucceeded {
uploadStatus = UploadStatusSuccess
} else {
uploadStatus = UploadStatusFailure
}
fmt.Printf("\nUpload status: %s!\n", uploadStatus)

if metricsUploadState.isSingleUploadSucceeded {
if len(metricsUploadState.uploadFailedMetrics) != 0 {
fmt.Printf("The Tsdump upload to Datadog succeeded but %d metrics partially failed to upload."+
" These failures can be due to transietnt network errors. If any of these metrics are critical for your investigation,"+
" please re-upload the Tsdump:\n%s\n", len(metricsUploadState.uploadFailedMetrics), strings.Join(func() []string {
var failedMetricsList []string
index := 1
for metric := range metricsUploadState.uploadFailedMetrics {
metric = fmt.Sprintf("%d) %s", index, metric)
failedMetricsList = append(failedMetricsList, metric)
index++
}
return failedMetricsList
}(), "\n"))
}

fmt.Println("\nupload id: ", d.uploadID)
fmt.Printf("datadog dashboard link: %s\n", dashboardLink)
} else {
fmt.Println("All metric upload is failed. Please re-upload the Tsdump.")
}
fmt.Println("\nupload id:", d.uploadID)
fmt.Printf("datadog dashboard link: %s\n", dashboardLink)

close(ch)
return nil
Expand Down

0 comments on commit 5b9cad9

Please sign in to comment.