Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(scorecard): long-running report generation websocket handling #989

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 86 additions & 6 deletions internal/test/scorecard/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"golang.org/x/net/websocket"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -422,15 +423,43 @@ func (client *RecordingClient) Delete(ctx context.Context, target *Target, recor
return nil
}

func (client *RecordingClient) RequestReportGeneration(ctx context.Context, target *Target, recording *Recording) (*string, error) {
func (client *RecordingClient) GenerateReport(ctx context.Context, target *Target, recording *Recording) (map[string]interface{}, error) {
if len(recording.ReportURL) < 1 {
return nil, fmt.Errorf("report URL is not available")
}

reportURL := client.Base.JoinPath(recording.ReportURL)
fmt.Printf("Client Base URL: %s\n", client.Base.String())
wsURL := client.Base.JoinPath("/api/notifications")
wsURL.Scheme = "wss"
fmt.Printf("WebSocket notifications URL: %s\n", wsURL.String())

// Authentication for OpenShift SSO
config, err := config.GetConfig()
if err != nil {
return nil, fmt.Errorf("failed to get in-cluster configurations: %s", err.Error())
}

wsHeader := make(http.Header)
wsHeader.Add("Authorization", fmt.Sprintf("Bearer %s", config.BearerToken))
wsOrigin, err := url.Parse("http://localhost/")
if err != nil {
return nil, fmt.Errorf("failed to parse WebSocket-Origin: %s", err.Error())
}
wsCfg := websocket.Config{
Origin: wsOrigin,
Location: wsURL,
Header: wsHeader,
Version: websocket.ProtocolVersionHybi13,
}
ws, err := wsCfg.DialContext(ctx)
// defer ws.Close()
if err != nil {
return nil, fmt.Errorf("WebSocket connection failed: %s", err.Error())
}

header := make(http.Header)

reportURL := client.Base.JoinPath(recording.ReportURL)
resp, err := SendRequest(ctx, client.Client, http.MethodGet, reportURL.String(), nil, header)
if err != nil {
return nil, err
Expand All @@ -440,13 +469,64 @@ func (client *RecordingClient) RequestReportGeneration(ctx context.Context, targ
if !StatusOK(resp.StatusCode) {
return nil, fmt.Errorf("API request failed with status code: %d, response body: %s, and headers:\n%s", resp.StatusCode, ReadError(resp), ReadHeader(resp))
}
// TODO refactor
if resp.StatusCode == 200 {
report := make(map[string]interface{}, 0)
err = ReadJSON(resp, &report)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %s", err.Error())
}

report, err := ReadString(resp)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %s", err.Error())
return report, nil
} else if resp.StatusCode == 202 {
asyncJobId, err := ReadString(resp)
if err != nil {
return nil, fmt.Errorf("failed to read async job ID: %s", err.Error())
}

asyncMsg := AsyncJobNotification{}
err = websocket.JSON.Receive(ws, &asyncMsg)
if err != nil {
return nil, fmt.Errorf("failed to read JSON notification: %s", err.Error())
}
expectedCategory := "ReportSuccess"
if asyncMsg.Meta.Category != expectedCategory {
// TODO loop on notifications for some time until this is observed?
return nil, fmt.Errorf("Notification received had the wrong category. Expected: %s . Got: %s", expectedCategory, asyncMsg.Meta.Category)
}
if asyncJobId != asyncMsg.Message.JobID {
return nil, fmt.Errorf("Notification received had the wrong job ID. Expected: %s . Got: %s", asyncJobId, asyncMsg.Message.JobID)
}
resp2, err := SendRequest(ctx, client.Client, http.MethodGet, reportURL.String(), nil, header)
if err != nil {
return nil, err
}
defer resp2.Body.Close()

report := make(map[string]interface{}, 0)
err = ReadJSON(resp2, &report)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %s", err.Error())
}

return report, nil
} else {
return nil, fmt.Errorf("Unexpected status code: %d", resp.StatusCode)
}
}

// {"meta":{"category":"ReportSuccess"},"message":{"jobId":"be321b87-55fb-4812-8eef-ced46eed8795"}}
type AsyncJobNotification struct {
Meta MessageMeta `json:"meta"`
Message AsyncJobMessage `json:"message"`
}

type MessageMeta struct {
Category string `json:"category"`
}

return &report, nil
type AsyncJobMessage struct {
JobID string `json:"jobId"`
}

func (client *RecordingClient) ListArchives(ctx context.Context, target *Target) ([]Archive, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/test/scorecard/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ func CryostatRecordingTest(bundle *apimanifests.Bundle, namespace string, openSh
}
r.Log += fmt.Sprintf("current list of archives: %+v\n", archives)

reportJobId, err := apiClient.Recordings().RequestReportGeneration(context.Background(), target, rec)
report, err := apiClient.Recordings().GenerateReport(context.Background(), target, rec)
if err != nil {
return r.fail(fmt.Sprintf("failed to generate report for the recording: %s", err.Error()))
}
r.Log += fmt.Sprintf("report generation job ID for the recording %s: %+v\n", rec.Name, *reportJobId)
r.Log += fmt.Sprintf("generated report for the recording %s: %+v\n", rec.Name, report)

// Stop the recording
err = apiClient.Recordings().Stop(context.Background(), target, rec.Id)
Expand Down
Loading