Skip to content

Commit

Permalink
Merge branch 'kfp-release-2.0.1' into 2.0.x
Browse files Browse the repository at this point in the history
Signed-off-by: Yihong Wang <[email protected]>
  • Loading branch information
yhwang committed Oct 2, 2023
2 parents 5a696bb + 63ca918 commit 5b6afdb
Show file tree
Hide file tree
Showing 42 changed files with 487 additions and 132 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Changelog

### [2.0.1](https://github.com/kubeflow/pipelines/compare/2.0.0...2.0.1) (2023-08-17)


### Bug Fixes

* **backend:** Fix performance issue within a mysql request ([\#9680](https://github.com/kubeflow/pipelines/issues/9680)) ([81618d0](https://github.com/kubeflow/pipelines/commit/81618d0fd6810560e0b78c61776d73042bd6f3bb))
* **backend:** fix timeouts with list run api. Fixes [\#9780](https://github.com/kubeflow/pipelines/issues/9780) ([\#9806](https://github.com/kubeflow/pipelines/issues/9806)) ([c467ece](https://github.com/kubeflow/pipelines/commit/c467ece30551046fa0304a6a7067d3e185d7cf14))
* **frontend:** Introduce ALLOWED_ARTIFACT_DOMAIN_REGEX flag to prevent accessing undesired domains. Remove user input string from server response. ([\#9844](https://github.com/kubeflow/pipelines/issues/9844)) ([737c0cc](https://github.com/kubeflow/pipelines/commit/737c0cc12606da3994e978678ace7adb1b309944))


### Other Pull Requests

* Fix Persistence Agent SA Token time interval ([\#9892](https://github.com/kubeflow/pipelines/issues/9892)) ([681c46f](https://github.com/kubeflow/pipelines/commit/681c46f62bb1d3aa5e1e4db2a239c7c4dd64881a))
* feat(backend) Enable auth between pesistence agent and pipelineAPI (ReportServer) ([\#9699](https://github.com/kubeflow/pipelines/issues/9699)) ([f232d0b](https://github.com/kubeflow/pipelines/commit/f232d0b3902bf666a2bfdc65ac6f93934e010083))
* fix(backend) Replace LEFT with INNER JOIN when Archive Experiment ([\#9730](https://github.com/kubeflow/pipelines/issues/9730)) ([5593dee](https://github.com/kubeflow/pipelines/commit/5593dee729b0b9518c1a70dbc3f0052796c4f10a))

## [2.0.0](https://github.com/kubeflow/pipelines/compare/1.7.0...2.0.0) (2023-06-20)


Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.0
2.0.1
4 changes: 2 additions & 2 deletions backend/api/v1beta1/python_http_client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ This file contains REST API specification for Kubeflow Pipelines. The file is au

This Python package is automatically generated by the [OpenAPI Generator](https://openapi-generator.tech) project:

- API version: 2.0.0
- Package version: 2.0.0
- API version: 2.0.1
- Package version: 2.0.1
- Build package: org.openapitools.codegen.languages.PythonClientCodegen
For more information, please visit [https://www.google.com](https://www.google.com)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import absolute_import

__version__ = "2.0.0"
__version__ = "2.0.1"

# import apis into sdk package
from kfp_server_api.api.experiment_service_api import ExperimentServiceApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self, configuration=None, header_name=None, header_value=None,
self.default_headers[header_name] = header_value
self.cookie = cookie
# Set default User-Agent.
self.user_agent = 'OpenAPI-Generator/2.0.0/python'
self.user_agent = 'OpenAPI-Generator/2.0.1/python'
self.client_side_validation = configuration.client_side_validation

def __enter__(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ def to_debug_report(self):
return "Python SDK Debug Report:\n"\
"OS: {env}\n"\
"Python Version: {pyversion}\n"\
"Version of the API: 2.0.0\n"\
"SDK Package Version: 2.0.0".\
"Version of the API: 2.0.1\n"\
"SDK Package Version: 2.0.1".\
format(env=sys.platform, pyversion=sys.version)

def get_host_settings(self):
Expand Down
2 changes: 1 addition & 1 deletion backend/api/v1beta1/python_http_client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from setuptools import setup, find_packages # noqa: H301

NAME = "kfp-server-api"
VERSION = "2.0.0"
VERSION = "2.0.1"
# To install the library, run the following
#
# python setup.py install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"swagger": "2.0",
"info": {
"title": "Kubeflow Pipelines API",
"version": "2.0.0",
"version": "2.0.1",
"description": "This file contains REST API specification for Kubeflow Pipelines. The file is autogenerated from the swagger definition.",
"contact": {
"name": "google",
Expand Down
4 changes: 2 additions & 2 deletions backend/api/v2beta1/python_http_client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ This file contains REST API specification for Kubeflow Pipelines. The file is au

This Python package is automatically generated by the [OpenAPI Generator](https://openapi-generator.tech) project:

- API version: 2.0.0
- Package version: 2.0.0
- API version: 2.0.1
- Package version: 2.0.1
- Build package: org.openapitools.codegen.languages.PythonClientCodegen
For more information, please visit [https://www.google.com](https://www.google.com)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import absolute_import

__version__ = "2.0.0"
__version__ = "2.0.1"

# import apis into sdk package
from kfp_server_api.api.auth_service_api import AuthServiceApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def __init__(self, configuration=None, header_name=None, header_value=None,
self.default_headers[header_name] = header_value
self.cookie = cookie
# Set default User-Agent.
self.user_agent = 'OpenAPI-Generator/2.0.0/python'
self.user_agent = 'OpenAPI-Generator/2.0.1/python'
self.client_side_validation = configuration.client_side_validation

def __enter__(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ def to_debug_report(self):
return "Python SDK Debug Report:\n"\
"OS: {env}\n"\
"Python Version: {pyversion}\n"\
"Version of the API: 2.0.0\n"\
"SDK Package Version: 2.0.0".\
"Version of the API: 2.0.1\n"\
"SDK Package Version: 2.0.1".\
format(env=sys.platform, pyversion=sys.version)

def get_host_settings(self):
Expand Down
2 changes: 1 addition & 1 deletion backend/api/v2beta1/python_http_client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from setuptools import setup, find_packages # noqa: H301

NAME = "kfp-server-api"
VERSION = "2.0.0"
VERSION = "2.0.1"
# To install the library, run the following
#
# python setup.py install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"swagger": "2.0",
"info": {
"title": "Kubeflow Pipelines API",
"version": "2.0.0",
"version": "2.0.1",
"description": "This file contains REST API specification for Kubeflow Pipelines. The file is autogenerated from the swagger definition.",
"contact": {
"name": "google",
Expand Down
34 changes: 32 additions & 2 deletions backend/src/agent/persistence/client/pipeline_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/kubeflow/pipelines/backend/src/apiserver/common"
Expand Down Expand Up @@ -47,11 +48,13 @@ type PipelineClient struct {
timeout time.Duration
reportServiceClient apiv2.ReportServiceClient
runServiceClient api.RunServiceClient
tokenRefresher TokenRefresherInterface
}

func NewPipelineClient(
initializeTimeout time.Duration,
timeout time.Duration,
tokenRefresher TokenRefresherInterface,
basePath string,
mlPipelineServiceName string,
mlPipelineServiceHttpPort string,
Expand All @@ -73,12 +76,17 @@ func NewPipelineClient(
initializeTimeout: initializeTimeout,
timeout: timeout,
reportServiceClient: apiv2.NewReportServiceClient(connection),
tokenRefresher: tokenRefresher,
runServiceClient: api.NewRunServiceClient(connection),
}, nil
}

func (p *PipelineClient) ReportWorkflow(workflow util.ExecutionSpec) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
pctx := context.Background()
pctx = metadata.AppendToOutgoingContext(pctx, "Authorization",
"Bearer "+p.tokenRefresher.GetToken())

ctx, cancel := context.WithTimeout(pctx, time.Minute)
defer cancel()

_, err := p.reportServiceClient.ReportWorkflow(ctx, &apiv2.ReportWorkflowRequest{
Expand All @@ -97,6 +105,15 @@ func (p *PipelineClient) ReportWorkflow(workflow util.ExecutionSpec) error {
statusCode.Message(),
err.Error(),
workflow.ToStringForStore())
} else if statusCode.Code() == codes.Unauthenticated && strings.Contains(err.Error(), "service account token has expired") {
// If unauthenticated because SA token is expired, re-read/refresh the token and try again
p.tokenRefresher.RefreshToken()
return util.NewCustomError(err, util.CUSTOM_CODE_TRANSIENT,
"Error while reporting workflow resource (code: %v, message: %v): %v, %+v",
statusCode.Code(),
statusCode.Message(),
err.Error(),
workflow.ToStringForStore())
} else {
// Retry otherwise
return util.NewCustomError(err, util.CUSTOM_CODE_TRANSIENT,
Expand All @@ -111,7 +128,11 @@ func (p *PipelineClient) ReportWorkflow(workflow util.ExecutionSpec) error {
}

func (p *PipelineClient) ReportScheduledWorkflow(swf *util.ScheduledWorkflow) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
pctx := context.Background()
pctx = metadata.AppendToOutgoingContext(pctx, "Authorization",
"Bearer "+p.tokenRefresher.GetToken())

ctx, cancel := context.WithTimeout(pctx, time.Minute)
defer cancel()

_, err := p.reportServiceClient.ReportScheduledWorkflow(ctx,
Expand All @@ -129,6 +150,15 @@ func (p *PipelineClient) ReportScheduledWorkflow(swf *util.ScheduledWorkflow) er
statusCode.Message(),
err.Error(),
swf.ScheduledWorkflow)
} else if statusCode.Code() == codes.Unauthenticated && strings.Contains(err.Error(), "service account token has expired") {
// If unauthenticated because SA token is expired, re-read/refresh the token and try again
p.tokenRefresher.RefreshToken()
return util.NewCustomError(err, util.CUSTOM_CODE_TRANSIENT,
"Error while reporting workflow resource (code: %v, message: %v): %v, %+v",
statusCode.Code(),
statusCode.Message(),
err.Error(),
swf.ScheduledWorkflow)
} else {
// Retry otherwise
return util.NewCustomError(err, util.CUSTOM_CODE_TRANSIENT,
Expand Down
78 changes: 78 additions & 0 deletions backend/src/agent/persistence/client/token_refresher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package client

import (
log "github.com/sirupsen/logrus"
"os"
"sync"
"time"
)

type TokenRefresherInterface interface {
GetToken() string
RefreshToken() error
}

const SaTokenFile = "/var/run/secrets/kubeflow/tokens/persistenceagent-sa-token"

type FileReader interface {
ReadFile(filename string) ([]byte, error)
}

type tokenRefresher struct {
mu sync.RWMutex
interval *time.Duration
token string
fileReader *FileReader
}

type FileReaderImpl struct{}

func (r *FileReaderImpl) ReadFile(filename string) ([]byte, error) {
return os.ReadFile(filename)
}

func NewTokenRefresher(interval time.Duration, fileReader FileReader) *tokenRefresher {
if fileReader == nil {
fileReader = &FileReaderImpl{}
}

tokenRefresher := &tokenRefresher{
interval: &interval,
fileReader: &fileReader,
}

return tokenRefresher
}

func (tr *tokenRefresher) StartTokenRefreshTicker() error {
err := tr.RefreshToken()
if err != nil {
return err
}

ticker := time.NewTicker(*tr.interval)
go func() {
for range ticker.C {
tr.RefreshToken()
}
}()
return err
}

func (tr *tokenRefresher) GetToken() string {
tr.mu.RLock()
defer tr.mu.RUnlock()
return tr.token
}

func (tr *tokenRefresher) RefreshToken() error {
tr.mu.Lock()
defer tr.mu.Unlock()
b, err := (*tr.fileReader).ReadFile(SaTokenFile)
if err != nil {
log.Errorf("Error reading persistence agent service account token '%s': %v", SaTokenFile, err)
return err
}
tr.token = string(b)
return nil
}
Loading

0 comments on commit 5b6afdb

Please sign in to comment.