From c7886ee4ef521f56a16385436a9b6693a9cef2bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan-Luis=20de=20Sousa-Valadas=20Casta=C3=B1o?= Date: Wed, 20 Nov 2024 20:12:04 +0100 Subject: [PATCH] Implement CPLB userspace reverse proxy LB MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit IPVS is problematic for many reasons, implement a userspace load balancer which should get the job done and should be far less problematic. Signed-off-by: Juan-Luis de Sousa-Valadas CastaƱo --- cmd/controller/controller.go | 7 + cmd/worker/worker.go | 11 +- inttest/Makefile.variables | 3 +- .../cplbipvs_test.go} | 16 +- inttest/cplb-userspace/cplbuserspace_test.go | 219 ++++++++++++++++++ inttest/customports/customports_test.go | 2 +- pkg/apis/k0s/v1beta1/cplb.go | 14 ++ pkg/component/controller/cplb/cplb_linux.go | 161 ++++++++++--- pkg/config/cli.go | 1 + .../k0s/k0s.k0sproject.io_clusterconfigs.yaml | 9 + 10 files changed, 397 insertions(+), 46 deletions(-) rename inttest/{cplb/cplb_test.go => cplb-ipvs/cplbipvs_test.go} (92%) create mode 100644 inttest/cplb-userspace/cplbuserspace_test.go diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index ade3efee648c..3a4d7d0113b0 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -47,6 +47,7 @@ import ( "github.com/k0sproject/k0s/pkg/component/controller/cplb" "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" "github.com/k0sproject/k0s/pkg/component/controller/workerconfig" + "github.com/k0sproject/k0s/pkg/component/iptables" "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/component/prober" "github.com/k0sproject/k0s/pkg/component/status" @@ -238,6 +239,11 @@ func (c *command) start(ctx context.Context) error { // Assume a single active controller during startup numActiveControllers := value.NewLatest[uint](1) + nodeComponents.Add(ctx, &iptables.Component{ + IPTablesMode: c.WorkerOptions.IPTablesMode, + BinDir: c.K0sVars.BinDir, + }) + if cplbCfg := nodeConfig.Spec.Network.ControlPlaneLoadBalancing; cplbCfg != nil && cplbCfg.Enabled { if c.SingleNode { return errors.New("control plane load balancing cannot be used in a single-node cluster") @@ -663,6 +669,7 @@ func (c *command) startWorker(ctx context.Context, profile string, nodeConfig *v wc.TokenArg = bootstrapConfig wc.WorkerProfile = profile wc.Labels = append(wc.Labels, fields.OneTermEqualSelector(constant.K0SNodeRoleLabel, "control-plane").String()) + wc.DisableIPTables = true if !c.SingleNode && !c.NoTaints { key := path.Join(constant.NodeRoleLabelNamespace, "master") taint := fields.OneTermEqualSelector(key, ":NoSchedule") diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index 6a22aa8b2981..6b0aefe92e24 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -148,11 +148,12 @@ func (c *Command) Start(ctx context.Context) error { c.WorkerProfile = "default-windows" } - componentManager.Add(ctx, &iptables.Component{ - IPTablesMode: c.WorkerOptions.IPTablesMode, - BinDir: c.K0sVars.BinDir, - }) - + if !c.DisableIPTables { + componentManager.Add(ctx, &iptables.Component{ + IPTablesMode: c.WorkerOptions.IPTablesMode, + BinDir: c.K0sVars.BinDir, + }) + } componentManager.Add(ctx, &worker.Kubelet{ CRISocket: c.CriSocket, diff --git a/inttest/Makefile.variables b/inttest/Makefile.variables index 2194e5612212..856fa1c64438 100644 --- a/inttest/Makefile.variables +++ b/inttest/Makefile.variables @@ -24,7 +24,8 @@ smoketests := \ check-cnichange \ check-configchange \ check-containerdimports \ - check-cplb \ + check-cplb-ipvs \ + check-cplb-userspace \ check-ctr \ check-custom-cidrs \ check-customca \ diff --git a/inttest/cplb/cplb_test.go b/inttest/cplb-ipvs/cplbipvs_test.go similarity index 92% rename from inttest/cplb/cplb_test.go rename to inttest/cplb-ipvs/cplbipvs_test.go index 0b7063fd05c2..83d8347c99fb 100644 --- a/inttest/cplb/cplb_test.go +++ b/inttest/cplb-ipvs/cplbipvs_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/suite" ) -type keepalivedSuite struct { +type CPLBIPVSSuite struct { common.BootlooseSuite } @@ -50,7 +50,7 @@ spec: // SetupTest prepares the controller and filesystem, getting it into a consistent // state which we can run tests against. -func (s *keepalivedSuite) TestK0sGetsUp() { +func (s *CPLBIPVSSuite) TestK0sGetsUp() { lb := s.getLBAddress() ctx := s.Context() var joinToken string @@ -111,7 +111,7 @@ func (s *keepalivedSuite) TestK0sGetsUp() { // getLBAddress returns the IP address of the controller 0 and it adds 100 to // the last octet unless it's bigger or equal to 154, in which case it // subtracts 100. Theoretically this could result in an invalid IP address. -func (s *keepalivedSuite) getLBAddress() string { +func (s *CPLBIPVSSuite) getLBAddress() string { ip := s.GetIPAddress(s.ControllerNode(0)) parts := strings.Split(ip, ".") if len(parts) != 4 { @@ -130,7 +130,7 @@ func (s *keepalivedSuite) getLBAddress() string { // validateRealServers checks that the real servers are present in the // ipvsadm output. -func (s *keepalivedSuite) validateRealServers(ctx context.Context, node string, vip string) { +func (s *CPLBIPVSSuite) validateRealServers(ctx context.Context, node string, vip string) { ssh, err := s.SSH(ctx, node) s.Require().NoError(err) defer ssh.Disconnect() @@ -151,7 +151,7 @@ func (s *keepalivedSuite) validateRealServers(ctx context.Context, node string, // checkDummy checks that the dummy interface is present on the given node and // that it has only the virtual IP address. -func (s *keepalivedSuite) checkDummy(ctx context.Context, node string, vip string) { +func (s *CPLBIPVSSuite) checkDummy(ctx context.Context, node string, vip string) { ssh, err := s.SSH(ctx, node) s.Require().NoError(err) defer ssh.Disconnect() @@ -167,7 +167,7 @@ func (s *keepalivedSuite) checkDummy(ctx context.Context, node string, vip strin // hasVIP checks that the dummy interface is present on the given node and // that it has only the virtual IP address. -func (s *keepalivedSuite) hasVIP(ctx context.Context, node string, vip string) bool { +func (s *CPLBIPVSSuite) hasVIP(ctx context.Context, node string, vip string) bool { ssh, err := s.SSH(ctx, node) s.Require().NoError(err) defer ssh.Disconnect() @@ -180,8 +180,8 @@ func (s *keepalivedSuite) hasVIP(ctx context.Context, node string, vip string) b // TestKeepAlivedSuite runs the keepalived test suite. It verifies that the // virtual IP is working by joining a node to the cluster using the VIP. -func TestKeepAlivedSuite(t *testing.T) { - suite.Run(t, &keepalivedSuite{ +func TestCPLBIPVSSuite(t *testing.T) { + suite.Run(t, &CPLBIPVSSuite{ common.BootlooseSuite{ ControllerCount: 3, WorkerCount: 1, diff --git a/inttest/cplb-userspace/cplbuserspace_test.go b/inttest/cplb-userspace/cplbuserspace_test.go new file mode 100644 index 000000000000..270a4c72f081 --- /dev/null +++ b/inttest/cplb-userspace/cplbuserspace_test.go @@ -0,0 +1,219 @@ +// Copyright 2024 k0s authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keepalived + +import ( + "context" + "crypto/tls" + "encoding/hex" + "fmt" + "net/http" + "strconv" + "strings" + "testing" + "time" + + "github.com/k0sproject/k0s/inttest/common" + + "github.com/stretchr/testify/suite" +) + +type CPLBUserSpaceSuite struct { + common.BootlooseSuite +} + +const haControllerConfig = ` +spec: + network: + controlPlaneLoadBalancing: + enabled: true + type: Keepalived + keepalived: + vrrpInstances: + - virtualIPs: ["%s/16"] + authPass: "123456" + nodeLocalLoadBalancing: + enabled: true + type: EnvoyProxy +` + +// SetupTest prepares the controller and filesystem, getting it into a consistent +// state which we can run tests against. +func (s *CPLBUserSpaceSuite) TestK0sGetsUp() { + lb := s.getLBAddress() + ctx := s.Context() + var joinToken string + + for idx := range s.BootlooseSuite.ControllerCount { + s.Require().NoError(s.WaitForSSH(s.ControllerNode(idx), 2*time.Minute, 1*time.Second)) + s.PutFile(s.ControllerNode(idx), "/tmp/k0s.yaml", fmt.Sprintf(haControllerConfig, lb)) + + // Note that the token is intentionally empty for the first controller + s.Require().NoError(s.InitController(idx, "--config=/tmp/k0s.yaml", "--disable-components=metrics-server", "--enable-worker", joinToken)) + s.Require().NoError(s.WaitJoinAPI(s.ControllerNode(idx))) + + // With the primary controller running, create the join token for subsequent controllers. + if idx == 0 { + token, err := s.GetJoinToken("controller") + s.Require().NoError(err) + joinToken = token + } + } + + // Final sanity -- ensure all nodes see each other according to etcd + for idx := range s.BootlooseSuite.ControllerCount { + s.Require().Len(s.GetMembers(idx), s.BootlooseSuite.ControllerCount) + } + + // Create a worker join token + workerJoinToken, err := s.GetJoinToken("worker") + s.Require().NoError(err) + + // Start the workers using the join token + s.Require().NoError(s.RunWorkersWithToken(workerJoinToken)) + + client, err := s.KubeClient(s.ControllerNode(0)) + s.Require().NoError(err) + + for idx := range s.BootlooseSuite.ControllerCount { + s.Require().NoError(s.WaitForNodeReady(s.ControllerNode(idx), client)) + } + s.Require().NoError(s.WaitForNodeReady(s.WorkerNode(0), client)) + + // Verify that none of the servers has the dummy interface + for idx := range s.BootlooseSuite.ControllerCount { + s.checkDummy(ctx, s.ControllerNode(idx)) + } + + // Verify that only one controller has the VIP in eth0 + count := 0 + for idx := range s.BootlooseSuite.ControllerCount { + if s.hasVIP(ctx, s.ControllerNode(idx), lb) { + count++ + } + } + s.Require().Equal(1, count, "Expected exactly one controller to have the VIP") + + // Verify that controller+worker nodes are working normally. + s.T().Log("waiting to see CNI pods ready") + s.Require().NoError(common.WaitForKubeRouterReady(s.Context(), client), "kube router did not start") + s.T().Log("waiting to see konnectivity-agent pods ready") + s.Require().NoError(common.WaitForDaemonSet(s.Context(), client, "konnectivity-agent", "kube-system"), "konnectivity-agent did not start") + s.T().Log("waiting to get logs from pods") + s.Require().NoError(common.WaitForPodLogs(s.Context(), client, "kube-system")) + + s.T().Log("Testing that the load balancer is actually balancing the load") + // Other stuff may be querying the controller, so do 6 times the number of controllers + // to ensure we get a good sample. + signatures := make(map[string]int) + for _ = range s.BootlooseSuite.ControllerCount * 6 { + signature, err := getServerCertSignature(fmt.Sprintf("https://%s:6443", lb)) + s.Require().NoError(err) + signatures[signature] = 1 + } + + s.Require().Equal(3, len(signatures), "Expected 3 different signatures, got %d", len(signatures)) +} + +// getLBAddress returns the IP address of the controller 0 and it adds 100 to +// the last octet unless it's bigger or equal to 154, in which case it +// subtracts 100. Theoretically this could result in an invalid IP address. +func (s *CPLBUserSpaceSuite) getLBAddress() string { + ip := s.GetIPAddress(s.ControllerNode(0)) + parts := strings.Split(ip, ".") + if len(parts) != 4 { + s.T().Fatalf("Invalid IP address: %q", ip) + } + lastOctet, err := strconv.Atoi(parts[3]) + s.Require().NoErrorf(err, "Failed to convert last octet %q to int", parts[3]) + if lastOctet >= 154 { + lastOctet -= 100 + } else { + lastOctet += 100 + } + + return fmt.Sprintf("%s.%d", strings.Join(parts[:3], "."), lastOctet) +} + +// checkDummy checks that the dummy interface isn't present in the node. +func (s *CPLBUserSpaceSuite) checkDummy(ctx context.Context, node string) { + ssh, err := s.SSH(ctx, node) + s.Require().NoError(err) + defer ssh.Disconnect() + + _, err = ssh.ExecWithOutput(ctx, "ip --oneline addr show dummyvip0") + s.Require().Error(err) +} + +// hasVIP checks that the dummy interface is present on the given node and +// that it has only the virtual IP address. +func (s *CPLBUserSpaceSuite) hasVIP(ctx context.Context, node string, vip string) bool { + ssh, err := s.SSH(ctx, node) + s.Require().NoError(err) + defer ssh.Disconnect() + + output, err := ssh.ExecWithOutput(ctx, "ip --oneline addr show eth0") + s.Require().NoError(err) + + return strings.Contains(output, fmt.Sprintf("inet %s/16", vip)) +} + +// getServerCertSignature connects to the given HTTPS URL and returns the server certificate signature. +func getServerCertSignature(url string) (string, error) { + // Create a custom HTTP client with a custom TLS configuration + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, // Skip verification for demonstration purposes + }, + }, + } + + // Make a request to the URL + resp, err := client.Get(url) + if err != nil { + return "", err + } + defer resp.Body.Close() + + // Get the TLS connection state + connState := resp.TLS + if connState == nil { + return "", fmt.Errorf("no TLS connection state") + } + + // Get the server certificate + if len(connState.PeerCertificates) == 0 { + return "", fmt.Errorf("no server certificate found") + } + cert := connState.PeerCertificates[0] + + // Get the certificate signature + signature := cert.Signature + + // Return the signature as a hex string + return hex.EncodeToString(signature), nil +} + +// TestKeepAlivedSuite runs the keepalived test suite. It verifies that the +// virtual IP is working by joining a node to the cluster using the VIP. +func TestCPLBUserSpaceSuite(t *testing.T) { + suite.Run(t, &CPLBUserSpaceSuite{ + common.BootlooseSuite{ + ControllerCount: 3, + WorkerCount: 1, + }, + }) +} diff --git a/inttest/customports/customports_test.go b/inttest/customports/customports_test.go index 9ff005449485..1d8cb6e37961 100644 --- a/inttest/customports/customports_test.go +++ b/inttest/customports/customports_test.go @@ -131,7 +131,7 @@ func (s *customPortsSuite) TestControllerJoinsWithCustomPort() { s.AssertSomeKubeSystemPods(kc) s.T().Log("waiting to see CNI pods ready") - s.Require().NoError(common.WaitForKubeRouterReady(s.Context(), kc), "calico did not start") + s.Require().NoError(common.WaitForKubeRouterReady(s.Context(), kc), "kube-router did not start") s.T().Log("waiting to see konnectivity-agent pods ready") s.Require().NoError(common.WaitForDaemonSet(s.Context(), kc, "konnectivity-agent", "kube-system"), "konnectivity-agent did not start") diff --git a/pkg/apis/k0s/v1beta1/cplb.go b/pkg/apis/k0s/v1beta1/cplb.go index 0689a769212c..7fe9512d3178 100644 --- a/pkg/apis/k0s/v1beta1/cplb.go +++ b/pkg/apis/k0s/v1beta1/cplb.go @@ -67,6 +67,14 @@ type KeepalivedSpec struct { // Configuration options related to the virtual servers. This is an array // which allows to configure multiple load balancers. VirtualServers VirtualServers `json:"virtualServers,omitempty"` + // UserspaceProxyPort is the port where the userspace proxy will bind + // to. This port is only exposed on the localhost interface and is only + // used internally. Defaults to 6444. + // +kubebuilder:default=6444 + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=65535 + // +optional + UserSpaceProxyPort int `json:"userSpaceProxyBindPort,omitempty"` } // VRRPInstances is a list of VRRPInstance @@ -293,6 +301,12 @@ func (k *KeepalivedSpec) Validate(externalAddress string) (errs []error) { errs = append(errs, k.validateVRRPInstances(nil)...) errs = append(errs, k.validateVirtualServers()...) + if k.UserSpaceProxyPort == 0 { + k.UserSpaceProxyPort = 6444 + } else if k.UserSpaceProxyPort < 1 || k.UserSpaceProxyPort > 65535 { + errs = append(errs, errors.New("UserSpaceProxyPort must be in the range of 1-65535")) + } + // CPLB reconciler relies in watching kubernetes.default.svc endpoints if externalAddress != "" && len(k.VirtualServers) > 0 { errs = append(errs, errors.New(".spec.api.externalAddress and virtual servers cannot be used together")) diff --git a/pkg/component/controller/cplb/cplb_linux.go b/pkg/component/controller/cplb/cplb_linux.go index 1803435be105..4203dcff08e8 100644 --- a/pkg/component/controller/cplb/cplb_linux.go +++ b/pkg/component/controller/cplb/cplb_linux.go @@ -21,9 +21,13 @@ import ( "context" "errors" "fmt" + "log" "net" + "os/exec" "path/filepath" "slices" + "strconv" + "strings" "syscall" "text/template" "time" @@ -32,6 +36,7 @@ import ( "github.com/k0sproject/k0s/internal/pkg/users" k0sAPI "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/assets" + "github.com/k0sproject/k0s/pkg/component/controller/cplb/tcpproxy" "github.com/k0sproject/k0s/pkg/config" "github.com/k0sproject/k0s/pkg/constant" "github.com/k0sproject/k0s/pkg/supervisor" @@ -55,6 +60,7 @@ type Keepalived struct { reconciler *CPLBReconciler updateCh chan struct{} reconcilerDone chan struct{} + proxy tcpproxy.Proxy } // Init extracts the needed binaries and creates the directories @@ -83,13 +89,14 @@ func (k *Keepalived) Start(_ context.Context) error { return nil } - if len(k.Config.VRRPInstances) > 0 { + // We only need the dummy interface when using IPVS. + if len(k.Config.VirtualServers) > 0 { if err := k.configureDummy(); err != nil { return fmt.Errorf("failed to configure dummy interface: %w", err) } } - if len(k.Config.VirtualServers) > 0 { + if len(k.Config.VRRPInstances) > 0 || len(k.Config.VirtualServers) > 0 { k.log.Info("Starting CPLB reconciler") updateCh := make(chan struct{}, 1) k.reconciler = NewCPLBReconciler(k.KubeConfigPath, updateCh) @@ -140,7 +147,14 @@ func (k *Keepalived) Start(_ context.Context) error { k.reconcilerDone = reconcilerDone go func() { defer close(reconcilerDone) - k.watchReconcilerUpdates() + if len(k.Config.VirtualServers) > 0 { + k.watchReconcilerUpdatesKeepalived() + } else { + + if err := k.watchReconcilerUpdatesReverseProxy(); err != nil { + k.log.Errorf("failed to watch reconciler updates: %v", err) + } + } }() } return k.supervisor.Supervise() @@ -149,30 +163,33 @@ func (k *Keepalived) Start(_ context.Context) error { // Stops keepalived and cleans up the virtual IPs. This is done so that if the // k0s controller is stopped, it can still reach the other APIservers on the VIP func (k *Keepalived) Stop() error { - if k.reconciler != nil { - k.log.Infof("Stopping cplb-reconciler") - k.reconciler.Stop() - close(k.updateCh) - <-k.reconcilerDone - } + /* + if k.reconciler != nil { + k.log.Infof("Stopping cplb-reconciler") + k.reconciler.Stop() + close(k.updateCh) + <-k.reconcilerDone + } - k.log.Infof("Stopping keepalived") - k.supervisor.Stop() + k.log.Infof("Stopping keepalived") + k.supervisor.Stop() - k.log.Infof("Deleting dummy interface") - link, err := netlink.LinkByName(dummyLinkName) - if err != nil { - if errors.As(err, &netlink.LinkNotFoundError{}) { - return nil - } - k.log.Errorf("failed to get link by name %s. Attempting to delete it anyway: %v", dummyLinkName, err) - link = &netlink.Dummy{ - LinkAttrs: netlink.LinkAttrs{ - Name: dummyLinkName, - }, + k.log.Infof("Deleting dummy interface") + link, err := netlink.LinkByName(dummyLinkName) + if err != nil { + if errors.As(err, &netlink.LinkNotFoundError{}) { + return nil + } + k.log.Errorf("failed to get link by name %s. Attempting to delete it anyway: %v", dummyLinkName, err) + link = &netlink.Dummy{ + LinkAttrs: netlink.LinkAttrs{ + Name: dummyLinkName, + }, + } } - } - return netlink.LinkDel(link) + return netlink.LinkDel(link) + */ + return nil } // configureDummy creates the dummy interface and sets the virtual IPs on it. @@ -241,7 +258,7 @@ func (k *Keepalived) ensureLinkAddresses(linkName string, expectedAddresses []st } // Remove unexpected addresses - for i := range linkAddrs { + for i := 0; i < len(linkAddrs); i++ { strAddr := strAddrs[i] linkAddr := linkAddrs[i] if !slices.Contains(expectedAddresses, strAddrs[i]) { @@ -321,7 +338,89 @@ func (k *Keepalived) generateKeepalivedTemplate() error { return nil } -func (k *Keepalived) watchReconcilerUpdates() { +func (k *Keepalived) watchReconcilerUpdatesReverseProxy() error { + k.proxy = tcpproxy.Proxy{} + // We don't know how long until we get the first update, so initially we + // forward everything to localhost + fmt.Println("VALADAS: Adding route", fmt.Sprintf(":%d", k.Config.UserSpaceProxyPort), fmt.Sprintf("127.0.0.1:%d", k.APIPort)) + k.proxy.AddRoute(fmt.Sprintf(":%d", k.Config.UserSpaceProxyPort), tcpproxy.To(fmt.Sprintf("127.0.0.1:%d", k.APIPort))) + + fmt.Println("VALADAS: starting proxy") + if err := k.proxy.Start(); err != nil { + return fmt.Errorf("failed to start proxy: %w", err) + } + fmt.Printf("VALADAS: proxy start address %p\n", &k.proxy) + fmt.Println("VALADAS: started proxy") + + // Do not create the proxy rules until we have the first update. + // This is to prevent that we are forwarding traffic to an apiserver + // which issn't work yet + fmt.Println("Waiting for updateCh") + <-k.updateCh + fmt.Println("VALADAS: Adding iptables rules") + for _, vrrp := range k.Config.VRRPInstances { + for _, vipCIDR := range vrrp.VirtualIPs { + vip := strings.Split(vipCIDR, "/")[0] + // create iptables rule + k.createProxyForwardRules(vip) + } + } + fmt.Println("VALADAS: Added iptables rules") + k.setProxyRoutes() + + for range k.updateCh { + k.setProxyRoutes() + } + return nil +} + +func (k *Keepalived) setProxyRoutes() { + fmt.Println("VALADAS: got update") + routes := []tcpproxy.Target{} + for _, addr := range k.reconciler.GetIPs() { + fmt.Println("VALADAS: appending route", addr) + routes = append(routes, tcpproxy.To(fmt.Sprintf("%s:%d", addr, k.APIPort))) + } + + for _, vrrp := range k.Config.VRRPInstances { + for _, vipCIDR := range vrrp.VirtualIPs { + vip := strings.Split(vipCIDR, "/")[0] + k.log.Infof("VALADAS: Updating routes for VIP %s with: %s", vip, routes) + k.proxy.SetRoutes(fmt.Sprintf(":%d", k.Config.UserSpaceProxyPort), routes) + + fmt.Printf("VALADAS: proxy setRoutes address %p\n", &k.proxy) + } + } +} + +func (k *Keepalived) createProxyForwardRules(vip string) { + fmt.Println("VALADAS: Adding iptables rule", vip, k.APIPort) + commands := [][]string{ + { + "-t", "nat", "-A", "PREROUTING", + "-p", "tcp", "-d", vip, "--dport", strconv.Itoa(k.APIPort), + "-j", "DNAT", "--to-destination", fmt.Sprintf("127.0.0.1:%d", k.Config.UserSpaceProxyPort), + }, + { + "-t", "nat", "-A", "OUTPUT", + "-p", "tcp", "-d", vip, "--dport", strconv.Itoa(k.APIPort), + "-j", "DNAT", "--to-destination", fmt.Sprintf("127.0.0.1:%d", k.Config.UserSpaceProxyPort), + }, + { + "-t", "nat", "-A", "POSTROUTING", + "-p", "tcp", "-d", "127.0.0.1", "--dport", strconv.Itoa(k.Config.UserSpaceProxyPort), + "-j", "MASQUERADE", + }, + } + + for _, cmdArgs := range commands { + cmd := exec.Command("/var/lib/k0s/bin/iptables", cmdArgs...) + if err := cmd.Run(); err != nil { + log.Fatalf("Failed to execute iptables command: %v", err) + } + } +} +func (k *Keepalived) watchReconcilerUpdatesKeepalived() { // Wait for the supervisor to start keepalived before // watching for endpoint changes process := k.supervisor.GetProcess() @@ -368,15 +467,15 @@ const dummyLinkName = "dummyvip0" var keepalivedConfigTemplate = template.Must(template.New("keepalived").Parse(` {{ range $i, $instance := .VRRPInstances }} vrrp_instance k0s-vip-{{$i}} { - # All servers must have state BACKUP so that when a new server comes up - # it doesn't perform a failover. This must be combined with the priority. + # All servers must have state BACKUP so that when a new server comes up + # it doesn't perform a failover. This must be combined with the priority. state BACKUP - # Make sure the interface is aligned with your server's network interface. + # Make sure the interface is aligned with your server's network interface interface {{ .Interface }} - # The virtual router ID must be unique to each VRRP instance that you define. + # The virtual router ID must be unique to each VRRP instance that you define virtual_router_id {{ $instance.VirtualRouterID }} # All servers have the same priority so that when a new one comes up we don't - # do a failover. + # do a failover priority 200 # advertisement interval, 1 second by default advert_int {{ $instance.AdvertIntervalSeconds }} diff --git a/pkg/config/cli.go b/pkg/config/cli.go index 2cca62e56b25..4e4f4b75aba3 100644 --- a/pkg/config/cli.go +++ b/pkg/config/cli.go @@ -85,6 +85,7 @@ type WorkerOptions struct { TokenArg string WorkerProfile string IPTablesMode string + DisableIPTables bool } func (o *ControllerOptions) Normalize() error { diff --git a/static/_crds/k0s/k0s.k0sproject.io_clusterconfigs.yaml b/static/_crds/k0s/k0s.k0sproject.io_clusterconfigs.yaml index d8b49378b7ef..7cdacaa02f3d 100644 --- a/static/_crds/k0s/k0s.k0sproject.io_clusterconfigs.yaml +++ b/static/_crds/k0s/k0s.k0sproject.io_clusterconfigs.yaml @@ -508,6 +508,15 @@ spec: Keepalived contains configuration options related to the "Keepalived" type of load balancing. properties: + userSpaceProxyBindPort: + default: 6444 + description: |- + UserspaceProxyPort is the port where the userspace proxy will bind + to. This port is only exposed on the localhost interface and is only + used internally. Defaults to 6444. + maximum: 65535 + minimum: 1 + type: integer virtualServers: description: |- Configuration options related to the virtual servers. This is an array