Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update/aws-tests #123

Merged
merged 4 commits into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@ jobs:
runs-on: ubuntu-latest
needs: validate
# build only on master branch and tags
if: ${{ !contains(github.event.head_commit.message,'[skip ci]') && (github.event_name == 'push' && (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/tags/'))) }}
if: ${{
!contains(github.event.head_commit.message, '[skip ci]') &&
(
(github.event_name == 'push' && (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/tags/'))) ||
(github.event_name == 'pull_request' && github.event.pull_request.draft == false)
)
}}
steps:
- name: checkout
uses: actions/checkout@v4
Expand All @@ -82,7 +88,6 @@ jobs:
uses: docker/setup-buildx-action@v3

- name: login to DockerHub
if: ${{ github.event_name != 'pull_request' }}
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
Expand Down
7 changes: 5 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assign

for {
err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy)
if err != nil && errors.Is(err, address.ErrStaticIPAlreadyAssigned) {
log.Infof("static public IP address already assigned to node instance %s", node.Instance)
return nil
}
if err != nil {
log.WithError(err).Errorf("failed to assign static public IP address to node %s", node.Name)
if retryCounter < cfg.RetryAttempts {
Expand All @@ -105,9 +109,8 @@ func assignAddress(c context.Context, log *logrus.Entry, assigner address.Assign
return errors.Wrap(err, "context is done")
}
}
break
return nil
}
return nil
}

func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
Expand Down
2 changes: 1 addition & 1 deletion examples/aws/eks.tf
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ resource "kubernetes_daemonset" "kubeip_daemonset" {
priority_class_name = "system-node-critical"
container {
name = "kubeip-agent"
image = "doitintl/kubeip-agent"
image = "doitintl/kubeip-agent:${var.kubeip_version}"
env {
name = "NODE_NAME"
value_from {
Expand Down
5 changes: 5 additions & 0 deletions examples/aws/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ variable "public_cidr_ranges" {
variable "kubernetes_version" {
type = string
default = "1.28"
}

variable "kubeip_version" {
type = string
default = "latest"
}
2 changes: 1 addition & 1 deletion examples/gcp/gke.tf
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ resource "kubernetes_daemonset" "kubeip_daemonset" {
priority_class_name = "system-node-critical"
container {
name = "kubeip-agent"
image = "doitintl/kubeip-agent"
image = "doitintl/kubeip-agent:${var.kubeip_version}"
env {
name = "NODE_NAME"
value_from {
Expand Down
5 changes: 5 additions & 0 deletions examples/gcp/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ variable "machine_type" {
variable "ipv6_support" {
type = bool
default = false
}

variable "kubeip_version" {
type = string
default = "latest"
}
4 changes: 3 additions & 1 deletion internal/address/assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
)

var (
ErrUnknownCloudProvider = errors.New("unknown cloud provider")
ErrUnknownCloudProvider = errors.New("unknown cloud provider")
ErrStaticIPAlreadyAssigned = errors.New("static public IP already assigned")
ErrNoStaticIPAssigned = errors.New("no static public IP assigned")
)

type Assigner interface {
Expand Down
184 changes: 104 additions & 80 deletions internal/address/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,54 +165,77 @@ func (a *awsAssigner) forceCheckAddressAssigned(ctx context.Context, allocationI
return false, nil
}

//nolint:funlen,gocyclo
func (a *awsAssigner) Assign(ctx context.Context, instanceID, _ string, filter []string, orderBy string) error {
// get elastic IP attached to the instance
filters := make(map[string][]string)
filters["instance-id"] = []string{instanceID}
addresses, err := a.eipLister.List(ctx, filters, true)
err := a.checkElasticIPAssigned(ctx, instanceID)
if err != nil {
return errors.Wrapf(err, "failed to list elastic IPs attached to instance %s", instanceID)
return errors.Wrapf(err, "check if elastic IP is already assigned to instance %s", instanceID)
}
if len(addresses) > 0 {
a.logger.Infof("elastic IP %s is already attached to instance %s", *addresses[0].PublicIp, instanceID)
return nil

// get available elastic IPs based on filter and orderBy
addresses, err := a.getAvailableElasticIPs(ctx, filter, orderBy)
if err != nil {
return errors.Wrap(err, "failed to get available elastic IPs")
}

// get available elastic IPs
filters = make(map[string][]string)
for _, f := range filter {
name, values, err2 := parseShorthandFilter(f)
if err2 != nil {
return errors.Wrapf(err2, "failed to parse filter %s", f)
}
filters[name] = values
// get EC2 instance
instance, err := a.instanceGetter.Get(ctx, instanceID, a.region)
if err != nil {
return errors.Wrapf(err, "failed to get instance %s", instanceID)
}
addresses, err = a.eipLister.List(context.Background(), filters, false)
// get primary network interface ID with public IP address (DeviceIndex == 0)
networkInterfaceID, err := a.getNetworkInterfaceID(instance)
if err != nil {
return errors.Wrap(err, "failed to list available elastic IPs")
return errors.Wrapf(err, "failed to get network interface ID for instance %s", instanceID)
}

// if no available elastic IPs, return error
if len(addresses) == 0 {
return errors.Errorf("no available elastic IPs")
// try to assign available addresses until succeeds
// due to concurrency, it is possible that another kubeip instance will assign the same address
for i := range addresses {
a.logger.WithFields(logrus.Fields{
"instance": instanceID,
"address": *addresses[i].PublicIp,
"allocation_id": *addresses[i].AllocationId,
"networkInterfaceID": networkInterfaceID,
}).Debug("assigning elastic IP to the instance")
err = a.tryAssignAddress(ctx, &addresses[i], networkInterfaceID, instanceID)
if err != nil {
a.logger.WithError(err).Warn("failed to assign elastic IP address")
a.logger.Debug("retrying with another address")
} else {
a.logger.WithFields(logrus.Fields{
"instance": instanceID,
"address": *addresses[i].PublicIp,
"allocation_id": *addresses[i].AllocationId,
}).Info("elastic IP assigned to the instance")
break // break if address assigned successfully
}
}

// log available addresses IPs
ips := make([]string, 0, len(addresses))
for _, address := range addresses {
ips = append(ips, *address.PublicIp)
if err != nil {
return errors.Wrap(err, "failed to assign elastic IP address")
}
a.logger.WithField("addresses", ips).Debugf("found %d available addresses", len(addresses))
return nil
}

// get EC2 instance
instance, err := a.instanceGetter.Get(ctx, instanceID, a.region)
func (a *awsAssigner) tryAssignAddress(ctx context.Context, address *types.Address, networkInterfaceID, instanceID string) error {
// force check if address is already assigned (reduce the chance of assigning the same address by multiple kubeip instances)
addressAssigned, err := a.forceCheckAddressAssigned(ctx, *address.AllocationId)
if err != nil {
return errors.Wrapf(err, "failed to get instance %s", instanceID)
return errors.Wrapf(err, "failed to check if address %s is assigned", *address.PublicIp)
}
if addressAssigned {
return errors.Errorf("address %s is already assigned", *address.PublicIp)
}
if err = a.eipAssigner.Assign(ctx, networkInterfaceID, *address.AllocationId); err != nil {
return errors.Wrapf(err, "failed to assign elastic IP %s to the instance %s", *address.PublicIp, instanceID)
}
return nil
}

func (a *awsAssigner) getNetworkInterfaceID(instance *types.Instance) (string, error) {
// get network interface ID
if instance.NetworkInterfaces == nil || len(instance.NetworkInterfaces) == 0 {
return errors.Errorf("no network interfaces found for instance %s", instanceID)
return "", errors.Errorf("no network interfaces found for instance %s", *instance.InstanceId)
}
// get primary network interface ID with public IP address (DeviceIndex == 0)
networkInterfaceID := ""
Expand All @@ -224,72 +247,73 @@ func (a *awsAssigner) Assign(ctx context.Context, instanceID, _ string, filter [
}
}
if networkInterfaceID == "" {
return errors.Errorf("no network interfaces with public IP address found for instance %s", instanceID)
return "", errors.Errorf("no network interfaces with public IP address found for instance %s", *instance.InstanceId)
}
return networkInterfaceID, nil
}

// sort addresses by orderBy field
sortAddressesByField(addresses, orderBy)

// try to assign all available addresses until one succeeds
// due to concurrency, it is possible that another kubeip instance will assign the same address
for i := range addresses {
// force check if address is already assigned (reduce the chance of assigning the same address by multiple kubeip instances)
var addressAssigned bool
addressAssigned, err = a.forceCheckAddressAssigned(ctx, *addresses[i].AllocationId)
if err != nil {
a.logger.WithError(err).Errorf("failed to check if address %s is assigned", *addresses[i].PublicIp)
a.logger.Debug("trying next address")
continue
}
if addressAssigned {
a.logger.WithField("address", addresses[i].PublicIp).Debug("address is already assigned")
a.logger.Debug("trying next address")
continue
}
a.logger.WithFields(logrus.Fields{
"instance": instanceID,
"address": *addresses[i].PublicIp,
"allocation_id": *addresses[i].AllocationId,
"networkInterfaceID": networkInterfaceID,
}).Debug("assigning elastic IP to the instance")
if err = a.eipAssigner.Assign(ctx, networkInterfaceID, *addresses[i].AllocationId); err != nil {
a.logger.WithFields(logrus.Fields{
"instance": instanceID,
"address": *addresses[i].PublicIp,
"allocation_id": *addresses[i].AllocationId,
"networkInterfaceID": networkInterfaceID,
}).Debug("failed to assign elastic IP to the instance")
a.logger.Debug("trying next address")
continue
}
a.logger.WithFields(logrus.Fields{
"instance": instanceID,
"address": *addresses[i].PublicIp,
"allocation_id": *addresses[i].AllocationId,
}).Info("elastic IP assigned to the instance")
break
}
func (a *awsAssigner) checkElasticIPAssigned(ctx context.Context, instanceID string) error {
filters := make(map[string][]string)
filters["instance-id"] = []string{instanceID}
addresses, err := a.eipLister.List(ctx, filters, true)
if err != nil {
return errors.Wrap(err, "failed to assign elastic IP address")
return errors.Wrapf(err, "failed to list elastic IPs attached to instance %s", instanceID)
}
if len(addresses) > 0 {
return ErrStaticIPAlreadyAssigned
}
return nil
}

func (a *awsAssigner) Unassign(ctx context.Context, instanceID, _ string) error {
func (a *awsAssigner) getAssignedElasticIP(ctx context.Context, instanceID string) (*types.Address, error) {
// get elastic IP attached to the instance
filters := make(map[string][]string)
filters["instance-id"] = []string{instanceID}
addresses, err := a.eipLister.List(ctx, filters, true)
if err != nil {
return errors.Wrapf(err, "failed to list elastic IPs attached to instance %s", instanceID)
return nil, errors.Wrapf(err, "failed to list elastic IPs attached to instance %s", instanceID)
}
if len(addresses) == 0 {
return nil, ErrNoStaticIPAssigned
}
return &addresses[0], nil
}

func (a *awsAssigner) getAvailableElasticIPs(ctx context.Context, filter []string, orderBy string) ([]types.Address, error) {
filters := make(map[string][]string)
for _, f := range filter {
name, values, err := parseShorthandFilter(f)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse filter %s", f)
}
filters[name] = values
}
addresses, err := a.eipLister.List(ctx, filters, false)
if err != nil {
return nil, errors.Wrap(err, "failed to list available elastic IPs")
}
if len(addresses) == 0 {
a.logger.Infof("no elastic IP attached to instance %s", instanceID)
return nil
return nil, errors.Errorf("no available elastic IPs")
}
// sort addresses by orderBy field
sortAddressesByField(addresses, orderBy)
// log available addresses IPs
ips := make([]string, 0, len(addresses))
for _, address := range addresses {
ips = append(ips, *address.PublicIp)
}
a.logger.WithField("addresses", ips).Debugf("Found %d available addresses", len(addresses))

return addresses, nil
}

func (a *awsAssigner) Unassign(ctx context.Context, instanceID, _ string) error {
// get elastic IP attached to the instance
address, err := a.getAssignedElasticIP(ctx, instanceID)
if err != nil {
return errors.Wrapf(err, "check if elastic IP is assigned to instance %s", instanceID)
}
// unassign elastic IP from the instance
address := addresses[0]
if err = a.eipAssigner.Unassign(ctx, *address.AssociationId); err != nil {
return errors.Wrap(err, "failed to unassign elastic IP")
}
Expand Down
Loading
Loading