Skip to content

Commit

Permalink
Add support for dockerhub rate-limiting
Browse files Browse the repository at this point in the history
Signed-off-by: Jiri Vrba <[email protected]>
  • Loading branch information
Grimm75 authored and Jiri Vrba committed Apr 2, 2024
1 parent cea47c7 commit fd3c0b8
Showing 1 changed file with 57 additions and 6 deletions.
63 changes: 57 additions & 6 deletions src/pkg/reg/adapter/dockerhub/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"

"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/lib/log"
Expand Down Expand Up @@ -124,6 +126,55 @@ func getAdapterInfo() *model.AdapterPattern {
return info
}

// Rate-limit aware wrapper function for client.Do()
// - Avoids being hit by limit by pausing requests when less than 'lowMark' requests remaining.
// - Pauses for given time when limit is hit.
// - Allows 2 more attempts before giving up.
// Reason: Observed (02/2024) penalty for hitting the limit is 120s, normal reset is 60s,
// so it is better to not hit the wall.
func (a *adapter) limitAwareDo(method string, path string, body io.Reader) (*http.Response, error) {
const lowMark = 8
var attemptsLeft = 3
for attemptsLeft > 0 {
clientResp, clientErr := a.client.Do(method, path, body)
if clientErr != nil {
return clientResp, clientErr
}
if clientResp.StatusCode != http.StatusTooManyRequests {
reqsLeft, err := strconv.ParseInt(clientResp.Header.Get("x-ratelimit-remaining"), 10, 64)
if err != nil {
return clientResp, clientErr
}
if reqsLeft < lowMark {
resetTSC, err := strconv.ParseInt(clientResp.Header.Get("x-ratelimit-reset"), 10, 64)
if err == nil {
dur := time.Until(time.Unix(resetTSC, 0))
log.Infof("Rate-limit exhaustion eminent, sleeping for %.1f seconds", dur.Seconds())
time.Sleep(dur)
log.Info("Sleep finished, resuming operation")
}
}
return clientResp, clientErr
}
var dur = time.Duration(0)
seconds, err := strconv.ParseInt(clientResp.Header.Get("retry-after"), 10, 64)
if err != nil {
expireTime, err := http.ParseTime(clientResp.Header.Get("retry-after"))
if err != nil {
return nil, errors.New("blocked by dockerhub rate-limit and missing retry-after header")
}
dur = time.Until(expireTime)
} else {
dur = time.Duration(seconds) * time.Second
}
log.Infof("Rate-limit exhausted, sleeping for %.1f seconds", dur.Seconds())
time.Sleep(dur)
log.Info("Sleep finished, resuming operation")
attemptsLeft--
}
return nil, errors.New("unable to get past dockerhub rate-limit")
}

// PrepareForPush does the prepare work that needed for pushing/uploading the resource
// eg: create the namespace or repository
func (a *adapter) PrepareForPush(resources []*model.Resource) error {
Expand Down Expand Up @@ -159,7 +210,7 @@ func (a *adapter) PrepareForPush(resources []*model.Resource) error {
}

func (a *adapter) listNamespaces() ([]string, error) {
resp, err := a.client.Do(http.MethodGet, listNamespacePath, nil)
resp, err := a.limitAwareDo(http.MethodGet, listNamespacePath, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -207,7 +258,7 @@ func (a *adapter) CreateNamespace(namespace *model.Namespace) error {
return err
}

resp, err := a.client.Do(http.MethodPost, createNamespacePath, bytes.NewReader(b))
resp, err := a.limitAwareDo(http.MethodPost, createNamespacePath, bytes.NewReader(b))
if err != nil {
return err
}
Expand All @@ -228,7 +279,7 @@ func (a *adapter) CreateNamespace(namespace *model.Namespace) error {

// getNamespace get namespace from DockerHub, if the namespace not found, two nil would be returned.
func (a *adapter) getNamespace(namespace string) (*model.Namespace, error) {
resp, err := a.client.Do(http.MethodGet, getNamespacePath(namespace), nil)
resp, err := a.limitAwareDo(http.MethodGet, getNamespacePath(namespace), nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -389,7 +440,7 @@ func (a *adapter) DeleteManifest(repository, reference string) error {
return fmt.Errorf("dockerhub only support repo in format <namespace>/<name>, but got: %s", repository)
}

resp, err := a.client.Do(http.MethodDelete, deleteTagPath(parts[0], parts[1], reference), nil)
resp, err := a.limitAwareDo(http.MethodDelete, deleteTagPath(parts[0], parts[1], reference), nil)
if err != nil {
return err
}
Expand All @@ -410,7 +461,7 @@ func (a *adapter) DeleteManifest(repository, reference string) error {

// getRepos gets a page of repos from DockerHub
func (a *adapter) getRepos(namespace, name string, page, pageSize int) (*ReposResp, error) {
resp, err := a.client.Do(http.MethodGet, listReposPath(namespace, name, page, pageSize), nil)
resp, err := a.limitAwareDo(http.MethodGet, listReposPath(namespace, name, page, pageSize), nil)
if err != nil {
return nil, err
}
Expand All @@ -437,7 +488,7 @@ func (a *adapter) getRepos(namespace, name string, page, pageSize int) (*ReposRe

// getTags gets a page of tags for a repo from DockerHub
func (a *adapter) getTags(namespace, repo string, page, pageSize int) (*TagsResp, error) {
resp, err := a.client.Do(http.MethodGet, listTagsPath(namespace, repo, page, pageSize), nil)
resp, err := a.limitAwareDo(http.MethodGet, listTagsPath(namespace, repo, page, pageSize), nil)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit fd3c0b8

Please sign in to comment.