Skip to content

Commit

Permalink
Add status change event
Browse files Browse the repository at this point in the history
Will propagated to client via SSE in ith own channel, to inform clients about CRC status change

Signed-off-by: Yevhen Vydolob <[email protected]>
  • Loading branch information
evidolob committed Dec 11, 2023
1 parent d2f1234 commit c695ae9
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pkg/crc/api/events/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewEventServer(machine machine.Client) *EventServer {

sseServer.CreateStream(Logs)
sseServer.CreateStream(ClusterLoad)
sseServer.CreateStream(StatusChange)
return eventServer
}

Expand All @@ -69,6 +70,8 @@ func createEventStream(server *EventServer, streamID string) EventStream {
return newLogsStream(server)
case ClusterLoad:
return newClusterLoadStream(server)
case StatusChange:
return newStatusChangeStream(server)
}
return nil
}
5 changes: 3 additions & 2 deletions pkg/crc/api/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package events
import "github.com/r3labs/sse/v2"

const (
Logs = "logs" // Logs event channel, contains daemon logs
ClusterLoad = "cluster_load" // status event channel, contains VM load info
Logs = "logs" // Logs event channel, contains daemon logs
ClusterLoad = "cluster_load" // status event channel, contains VM load info
StatusChange = "status_change" // status change channel, fires on 'starting', 'stopping', etc
)

type EventPublisher interface {
Expand Down
69 changes: 69 additions & 0 deletions pkg/crc/api/events/status_change_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package events

import (
"encoding/json"

"github.com/crc-org/crc/v2/pkg/crc/logging"
"github.com/crc-org/crc/v2/pkg/crc/machine"
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
"github.com/crc-org/crc/v2/pkg/events"
"github.com/r3labs/sse/v2"
)

type statusChangeEvent struct {
Status *types.ClusterStatusResult `json:"status"`
Error string `json:"error,omitempty"`
}

type statusChangeListener struct {
machineClient machine.Client
publisher EventPublisher
}

func newStatusChangeStream(server *EventServer) EventStream {
return newStream(newStatusChangeListener(server.machine), newEventPublisher(StatusChange, server.sseServer))
}

func newStatusChangeListener(client machine.Client) EventProducer {
return &statusChangeListener{
machineClient: client,
}
}

func (st *statusChangeListener) Notify(changedEvent events.StatusChangedEvent) {
logging.Debugf("State Changed Event %s", changedEvent)
var event statusChangeEvent
status, err := st.machineClient.Status()
// if we cannot receive actual state, send error state with error description
if err != nil {
event = statusChangeEvent{Status: &types.ClusterStatusResult{
CrcStatus: state.Error,
}, Error: err.Error()}
} else {
// event could be fired, before actual code, which change state is called
// so status could contain 'old' state, replace it with state received in event
status.CrcStatus = changedEvent.State // override with actual reported state
event = statusChangeEvent{Status: status}
if changedEvent.Error != nil {
event.Error = changedEvent.Error.Error()
}

}
data, err := json.Marshal(event)
if err != nil {
logging.Errorf("Could not serealize status changed event in to JSON: %s", err)
return
}
st.publisher.Publish(&sse.Event{Event: []byte(StatusChange), Data: data})
}

func (st *statusChangeListener) Start(publisher EventPublisher) {
st.publisher = publisher
events.StatusChanged.AddListener(st)

}

func (st *statusChangeListener) Stop() {
events.StatusChanged.RemoveListener(st)
}
28 changes: 27 additions & 1 deletion pkg/crc/machine/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
crcPreset "github.com/crc-org/crc/v2/pkg/crc/preset"
"github.com/crc-org/crc/v2/pkg/events"
)

const startCancelTimeout = 15 * time.Second
Expand Down Expand Up @@ -69,6 +70,10 @@ func (s *Synchronized) Delete() error {

err := s.underlying.Delete()
s.syncOperationDone <- Deleting

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.NoVM})
}
return err
}

Expand All @@ -80,6 +85,7 @@ func (s *Synchronized) prepareStart(startCancel context.CancelFunc) error {
}
s.startCancel = startCancel
s.currentState = Starting
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Starting})

return nil
}
Expand All @@ -92,6 +98,13 @@ func (s *Synchronized) Start(ctx context.Context, startConfig types.StartConfig)

startResult, err := s.underlying.Start(ctx, startConfig)
s.syncOperationDone <- Starting

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: startResult.Status})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}

return startResult, err
}

Expand Down Expand Up @@ -136,10 +149,16 @@ func (s *Synchronized) Stop() (state.State, error) {
if err := s.prepareStopDelete(Stopping); err != nil {
return state.Error, err
}
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopping})

st, err := s.underlying.Stop()
s.syncOperationDone <- Stopping

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: st})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}
return st, err
}

Expand All @@ -160,7 +179,14 @@ func (s *Synchronized) ConnectionDetails() (*types.ConnectionDetails, error) {
}

func (s *Synchronized) PowerOff() error {
return s.underlying.PowerOff()
err := s.underlying.PowerOff()
if err != nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopped})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}

return err
}

func (s *Synchronized) Status() (*types.ClusterStatusResult, error) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package events

import (
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
)

type StatusChangedEvent struct {
State state.State
Error error
}

var (
StatusChanged = NewEvent[StatusChangedEvent]()
)

0 comments on commit c695ae9

Please sign in to comment.