diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 1d48fa6732c..4b40e32bada 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1822,6 +1822,7 @@ func (s *EtcdServer) apply( zap.Stringer("type", e.Type)) switch e.Type { case raftpb.EntryNormal: + // gofail: var beforeApplyOneEntryNormal struct{} s.applyEntryNormal(&e) s.setAppliedIndex(e.Index) s.setTerm(e.Term) diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index 55b31336ba3..4d91a53a215 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -316,7 +316,7 @@ func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) { ) require.NoError(t, err) - epc, err = e2e.StartEtcdProcessCluster(ctx, epc, cfg) + epc, err = e2e.StartEtcdProcessCluster(ctx, t, epc, cfg) require.NoError(t, err) t.Cleanup(func() { diff --git a/tests/e2e/discovery_v3_test.go b/tests/e2e/discovery_v3_test.go index 8262f373678..e718530a3ee 100644 --- a/tests/e2e/discovery_v3_test.go +++ b/tests/e2e/discovery_v3_test.go @@ -120,5 +120,5 @@ func bootstrapEtcdClusterUsingV3Discovery(t *testing.T, discoveryEndpoints []str } // start the cluster - return e2e.StartEtcdProcessCluster(context.TODO(), epc, cfg) + return e2e.StartEtcdProcessCluster(context.TODO(), t, epc, cfg) } diff --git a/tests/e2e/http_health_check_test.go b/tests/e2e/http_health_check_test.go new file mode 100644 index 00000000000..8007b48f634 --- /dev/null +++ b/tests/e2e/http_health_check_test.go @@ -0,0 +1,226 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "io" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "go.etcd.io/etcd/tests/v3/framework/config" + "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/framework/testutils" +) + +type healthCheckConfig struct { + url string + expectedStatusCode int + expectedTimeoutError bool +} + +func TestHTTPHealthHandler(t *testing.T) { + e2e.BeforeTest(t) + client := &http.Client{} + tcs := []struct { + name string + injectFailure func(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) + clusterOptions []e2e.EPClusterOption + healthChecks []healthCheckConfig + }{ + { + name: "no failures", // happy case + clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1)}, + healthChecks: []healthCheckConfig{ + { + url: "/health", + expectedStatusCode: http.StatusOK, + }, + }, + }, + { + name: "activated no space alarm", + injectFailure: triggerNoSpaceAlarm, + clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithQuotaBackendBytes(int64(13 * os.Getpagesize()))}, + healthChecks: []healthCheckConfig{ + { + url: "/health", + expectedStatusCode: http.StatusServiceUnavailable, + }, + { + url: "/health?exclude=NOSPACE", + expectedStatusCode: http.StatusOK, + }, + }, + }, + { + name: "overloaded server slow apply", + injectFailure: triggerSlowApply, + clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithGoFailEnabled(true)}, + healthChecks: []healthCheckConfig{ + { + url: "/health?serializable=true", + expectedStatusCode: http.StatusOK, + }, + { + url: "/health?serializable=false", + expectedTimeoutError: true, + }, + }, + }, + { + name: "network partitioned", + injectFailure: blackhole, + clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(3), e2e.WithIsPeerTLS(true), e2e.WithPeerProxy(true)}, + healthChecks: []healthCheckConfig{ + { + url: "/health?serializable=true", + expectedStatusCode: http.StatusOK, + }, + { + url: "/health?serializable=false", + expectedTimeoutError: true, + expectedStatusCode: http.StatusServiceUnavailable, + // old leader may return "etcdserver: leader changed" error with 503 in ReadIndex leaderChangedNotifier + }, + }, + }, + { + name: "raft loop deadlock", + injectFailure: triggerRaftLoopDeadLock, + clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true)}, + healthChecks: []healthCheckConfig{ + { + // current kubeadm etcd liveness check failed to detect raft loop deadlock in steady state + // ref. https://github.com/kubernetes/kubernetes/blob/master/cmd/kubeadm/app/phases/etcd/local.go#L225-L226 + // current liveness probe depends on the etcd /health check has a flaw that new /livez check should resolve. + url: "/health?serializable=true", + expectedStatusCode: http.StatusOK, + }, + { + url: "/health?serializable=false", + expectedTimeoutError: true, + }, + }, + }, + // verify that auth enabled serializable read must go through mvcc + { + name: "slow buffer write back with auth enabled", + injectFailure: triggerSlowBufferWriteBackWithAuth, + clusterOptions: []e2e.EPClusterOption{e2e.WithClusterSize(1), e2e.WithGoFailEnabled(true)}, + healthChecks: []healthCheckConfig{ + { + url: "/health?serializable=true", + expectedTimeoutError: true, + }, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + clus, err := e2e.NewEtcdProcessCluster(ctx, t, tc.clusterOptions...) + require.NoError(t, err) + defer clus.Close() + testutils.ExecuteUntil(ctx, t, func() { + if tc.injectFailure != nil { + tc.injectFailure(ctx, t, clus) + } + + for _, hc := range tc.healthChecks { + requestURL := clus.Procs[0].EndpointsHTTP()[0] + hc.url + t.Logf("health check URL is %s", requestURL) + doHealthCheckAndVerify(t, client, requestURL, hc.expectedStatusCode, hc.expectedTimeoutError) + } + }) + }) + } +} + +func doHealthCheckAndVerify(t *testing.T, client *http.Client, url string, expectStatusCode int, expectTimeoutError bool) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + require.NoErrorf(t, err, "failed to creat request %+v", err) + resp, herr := client.Do(req) + cancel() + if expectTimeoutError { + if herr != nil && strings.Contains(herr.Error(), context.DeadlineExceeded.Error()) { + return + } + } + require.NoErrorf(t, herr, "failed to get response %+v", err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + require.NoErrorf(t, err, "failed to read response %+v", err) + + t.Logf("health check response body is: %s", body) + require.Equal(t, expectStatusCode, resp.StatusCode) +} + +func triggerNoSpaceAlarm(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) { + buf := strings.Repeat("b", os.Getpagesize()) + etcdctl := clus.Etcdctl() + for { + if err := etcdctl.Put(ctx, "foo", buf, config.PutOptions{}); err != nil { + if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") { + t.Fatal(err) + } + break + } + } +} + +func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) { + // the following proposal will be blocked at applying stage + // because when apply index < committed index, linearizable read would time out. + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeApplyOneEntryNormal", `sleep("3s")`)) + require.NoError(t, clus.Procs[1].Etcdctl().Put(ctx, "foo", "bar", config.PutOptions{})) +} + +func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) { + member := clus.Procs[0] + proxy := member.PeerProxy() + t.Logf("Blackholing traffic from and to member %q", member.Config().Name) + proxy.BlackholeTx() + proxy.BlackholeRx() +} + +func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) { + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "raftBeforeSave", `sleep("3s")`)) + clus.Procs[0].Etcdctl().Put(context.Background(), "foo", "bar", config.PutOptions{}) +} + +func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) { + etcdctl := clus.Etcdctl() + _, err := etcdctl.UserAdd(ctx, "root", "root", config.UserAddOptions{}) + require.NoError(t, err) + _, err = etcdctl.UserGrantRole(ctx, "root", "root") + require.NoError(t, err) + require.NoError(t, etcdctl.AuthEnable(ctx)) + + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeWritebackBuf", `sleep("3s")`)) + clus.Procs[0].Etcdctl(e2e.WithAuth("root", "root")).Put(context.Background(), "foo", "bar", config.PutOptions{Timeout: 200 * time.Millisecond}) +} diff --git a/tests/framework/config/client.go b/tests/framework/config/client.go index e570c9c119f..ac82bd54792 100644 --- a/tests/framework/config/client.go +++ b/tests/framework/config/client.go @@ -42,6 +42,7 @@ type GetOptions struct { type PutOptions struct { LeaseID clientv3.LeaseID + Timeout time.Duration } type DeleteOptions struct { diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 8ab7345366a..180c7484bda 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -369,7 +369,7 @@ func NewEtcdProcessCluster(ctx context.Context, t testing.TB, opts ...EPClusterO return nil, err } - return StartEtcdProcessCluster(ctx, epc, cfg) + return StartEtcdProcessCluster(ctx, t, epc, cfg) } // InitEtcdProcessCluster initializes a new cluster based on the given config. @@ -409,7 +409,7 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP } // StartEtcdProcessCluster launches a new cluster from etcd processes. -func StartEtcdProcessCluster(ctx context.Context, epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) { +func StartEtcdProcessCluster(ctx context.Context, t testing.TB, epc *EtcdProcessCluster, cfg *EtcdProcessClusterConfig) (*EtcdProcessCluster, error) { if cfg.RollingStart { if err := epc.RollingStart(ctx); err != nil { return nil, fmt.Errorf("cannot rolling-start: %v", err) @@ -420,6 +420,13 @@ func StartEtcdProcessCluster(ctx context.Context, epc *EtcdProcessCluster, cfg * } } + for _, proc := range epc.Procs { + if cfg.GoFailEnabled && !proc.Failpoints().Enabled() { + epc.Close() + t.Skip("please run 'make gofail-enable && make build' before running the test") + } + } + return epc, nil } diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index f6d53d3f1b6..cb8f4a20bf7 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -373,6 +373,14 @@ var httpClient = http.Client{ Timeout: 10 * time.Millisecond, } +func (f *BinaryFailpoints) Enabled() bool { + _, err := failpoints(f.member) + if err != nil { + return false + } + return true +} + func (f *BinaryFailpoints) Available(failpoint string) bool { if f.availableCache == nil { fs, err := failpoints(f.member) diff --git a/tests/framework/e2e/etcdctl.go b/tests/framework/e2e/etcdctl.go index 020acbbfacb..32589684e44 100644 --- a/tests/framework/e2e/etcdctl.go +++ b/tests/framework/e2e/etcdctl.go @@ -158,6 +158,9 @@ func (ctl *EtcdctlV3) Put(ctx context.Context, key, value string, opts config.Pu if opts.LeaseID != 0 { args = append(args, "--lease", strconv.FormatInt(int64(opts.LeaseID), 16)) } + if opts.Timeout != 0 { + args = append(args, fmt.Sprintf("--command-timeout=%s", opts.Timeout)) + } _, err := SpawnWithExpectLines(ctx, args, nil, expect.ExpectedResponse{Value: "OK"}) return err } diff --git a/tests/framework/integration/integration.go b/tests/framework/integration/integration.go index 5d8a2aa05b1..41fd78a8655 100644 --- a/tests/framework/integration/integration.go +++ b/tests/framework/integration/integration.go @@ -169,6 +169,11 @@ func (c integrationClient) Get(ctx context.Context, key string, o config.GetOpti } func (c integrationClient) Put(ctx context.Context, key, value string, opts config.PutOptions) error { + if opts.Timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, opts.Timeout) + defer cancel() + } var clientOpts []clientv3.OpOption if opts.LeaseID != 0 { clientOpts = append(clientOpts, clientv3.WithLease(opts.LeaseID))