From a6a8fb402aea7bb6eabd9843482ab56f898cdb41 Mon Sep 17 00:00:00 2001 From: Yevhen Vydolob Date: Wed, 1 Nov 2023 14:16:22 +0200 Subject: [PATCH] Implement state change event Signed-off-by: Yevhen Vydolob --- cmd/crc/cmd/status.go | 2 +- pkg/crc/api/events/cluster_load_stream.go | 2 +- pkg/crc/api/events/event_server.go | 11 ++-- pkg/crc/api/events/events.go | 5 +- pkg/crc/api/events/log_stream.go | 9 +++- pkg/crc/api/events/status_change_stream.go | 62 ++++++++++++++++++++++ pkg/crc/machine/state/state.go | 2 +- pkg/crc/machine/status.go | 2 +- pkg/crc/machine/sync.go | 28 +++++++++- pkg/events/emitter.go | 8 +-- pkg/events/events.go | 14 +++++ 11 files changed, 128 insertions(+), 17 deletions(-) create mode 100644 pkg/crc/api/events/status_change_stream.go create mode 100644 pkg/events/events.go diff --git a/cmd/crc/cmd/status.go b/cmd/crc/cmd/status.go index 2b65aa71ca..95ff518473 100644 --- a/cmd/crc/cmd/status.go +++ b/cmd/crc/cmd/status.go @@ -147,7 +147,7 @@ func getStatus(client *daemonclient.Client, cacheDir string) *status { } return &status{Success: false, Error: crcErrors.ToSerializableError(err)} } - if clusterStatus.CrcStatus == string(state.Novm) { + if clusterStatus.CrcStatus == string(state.NoVM) { return &status{Success: false, Error: crcErrors.ToSerializableError(crcErrors.VMNotExist)} } var size int64 diff --git a/pkg/crc/api/events/cluster_load_stream.go b/pkg/crc/api/events/cluster_load_stream.go index be4b1ab12e..f128262953 100644 --- a/pkg/crc/api/events/cluster_load_stream.go +++ b/pkg/crc/api/events/cluster_load_stream.go @@ -20,7 +20,7 @@ type TickListener struct { } func newClusterLoadStream(server *EventServer) EventStream { - return newStream(newStatusListener(server.machine), newEventPublisher(CLUSTER_LOAD, server.sseServer)) + return newStream(newStatusListener(server.machine), newEventPublisher(ClusterLoad, server.sseServer)) } func newStatusListener(machine crcMachine.Client) EventProducer { diff --git a/pkg/crc/api/events/event_server.go b/pkg/crc/api/events/event_server.go index 80ba0dc529..1a7e5155c8 100644 --- a/pkg/crc/api/events/event_server.go +++ b/pkg/crc/api/events/event_server.go @@ -54,8 +54,9 @@ func NewEventServer(machine machine.Client) *EventServer { stream.RemoveSubscriber(sub) } - sseServer.CreateStream(LOGS) - sseServer.CreateStream(CLUSTER_LOAD) + sseServer.CreateStream(Logs) + sseServer.CreateStream(ClusterLoad) + sseServer.CreateStream(StatusChange) return eventServer } @@ -65,10 +66,12 @@ func (es *EventServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func createEventStream(server *EventServer, streamID string) EventStream { switch streamID { - case LOGS: + case Logs: return newLogsStream(server) - case CLUSTER_LOAD: + case ClusterLoad: return newClusterLoadStream(server) + case StatusChange: + return newStatusChangeStream(server) } return nil } diff --git a/pkg/crc/api/events/events.go b/pkg/crc/api/events/events.go index 1c2e5af580..027d35d7da 100644 --- a/pkg/crc/api/events/events.go +++ b/pkg/crc/api/events/events.go @@ -3,8 +3,9 @@ package events import "github.com/r3labs/sse/v2" const ( - LOGS = "logs" // Logs event channel, contains daemon logs - CLUSTER_LOAD = "cluster_load" // status event channel, contains VM load info + Logs = "logs" // Logs event channel, contains daemon logs + ClusterLoad = "cluster_load" // status event channel, contains VM load info + StatusChange = "status_change" // status change channel, fires on 'starting', 'stopping', etc ) type EventPublisher interface { diff --git a/pkg/crc/api/events/log_stream.go b/pkg/crc/api/events/log_stream.go index 2601a38e87..bbb60b067d 100644 --- a/pkg/crc/api/events/log_stream.go +++ b/pkg/crc/api/events/log_stream.go @@ -1,6 +1,8 @@ package events import ( + "bytes" + "github.com/crc-org/crc/v2/pkg/crc/logging" "github.com/r3labs/sse/v2" "github.com/sirupsen/logrus" @@ -23,7 +25,7 @@ func newSSEStreamHook(server *sse.Server) *streamHook { &logrus.JSONFormatter{ TimestampFormat: "", DisableTimestamp: false, - DisableHTMLEscape: false, + DisableHTMLEscape: true, DataKey: "", FieldMap: nil, CallerPrettyfier: nil, @@ -56,7 +58,10 @@ func (s *streamHook) Fire(entry *logrus.Entry) error { return err } - s.server.Publish(LOGS, &sse.Event{Event: []byte(LOGS), Data: line}) + // remove "Line Feed"("\n") character which add was added by json.Encoder + line = bytes.TrimRight(line, "\n") + + s.server.Publish(Logs, &sse.Event{Event: []byte(Logs), Data: line}) return nil } diff --git a/pkg/crc/api/events/status_change_stream.go b/pkg/crc/api/events/status_change_stream.go new file mode 100644 index 0000000000..3eda4e5d5b --- /dev/null +++ b/pkg/crc/api/events/status_change_stream.go @@ -0,0 +1,62 @@ +package events + +import ( + "encoding/json" + + "github.com/crc-org/crc/v2/pkg/crc/logging" + "github.com/crc-org/crc/v2/pkg/crc/machine" + "github.com/crc-org/crc/v2/pkg/crc/machine/state" + "github.com/crc-org/crc/v2/pkg/crc/machine/types" + "github.com/crc-org/crc/v2/pkg/events" + "github.com/r3labs/sse/v2" +) + +type statusChangeEvent struct { + Status *types.ClusterStatusResult `json:"status"` + Error string `json:"error,omitempty"` +} + +type statusChange struct { + listenerDisposable events.Disposable + machineClient machine.Client +} + +func newStatusChangeStream(server *EventServer) EventStream { + return newStream(newStatusChangeListener(server.machine), newEventPublisher(StatusChange, server.sseServer)) +} + +func newStatusChangeListener(client machine.Client) EventProducer { + return &statusChange{ + machineClient: client, + } +} + +func (st *statusChange) Start(publisher EventPublisher) { + st.listenerDisposable = events.StatusChanged.AddListener(func(changedEvent events.StatusChangedEvent) { + logging.Debugf("State Changed Event %s", changedEvent) + var event statusChangeEvent + status, err := st.machineClient.Status() + // if we cannot receive actual state, send error state with error description + if err != nil { + event = statusChangeEvent{Status: &types.ClusterStatusResult{ + CrcStatus: state.Error, + }, Error: err.Error()} + } else { + status.CrcStatus = changedEvent.State // override with actual reported state + event = statusChangeEvent{Status: status} + if changedEvent.Error != nil { + event.Error = changedEvent.Error.Error() + } + + } + data, _ := json.Marshal(event) + publisher.Publish(&sse.Event{Event: []byte(StatusChange), Data: data}) + }) +} + +func (st *statusChange) Stop() { + if st.listenerDisposable != nil { + st.listenerDisposable() + st.listenerDisposable = nil + } +} diff --git a/pkg/crc/machine/state/state.go b/pkg/crc/machine/state/state.go index ca9b2940a3..e3cddf5857 100644 --- a/pkg/crc/machine/state/state.go +++ b/pkg/crc/machine/state/state.go @@ -10,7 +10,7 @@ const ( Stopped State = "Stopped" Stopping State = "Stopping" Starting State = "Starting" - Novm State = "NoVM" + NoVM State = "NoVM" Error State = "Error" ) diff --git a/pkg/crc/machine/status.go b/pkg/crc/machine/status.go index f3218deafe..7396b8657c 100644 --- a/pkg/crc/machine/status.go +++ b/pkg/crc/machine/status.go @@ -20,7 +20,7 @@ func (client *client) Status() (*types.ClusterStatusResult, error) { } if !exists { return &types.ClusterStatusResult{ - CrcStatus: state.Novm, + CrcStatus: state.NoVM, }, nil } diff --git a/pkg/crc/machine/sync.go b/pkg/crc/machine/sync.go index 6da384b415..fe53086d1b 100644 --- a/pkg/crc/machine/sync.go +++ b/pkg/crc/machine/sync.go @@ -10,6 +10,7 @@ import ( "github.com/crc-org/crc/v2/pkg/crc/machine/state" "github.com/crc-org/crc/v2/pkg/crc/machine/types" crcPreset "github.com/crc-org/crc/v2/pkg/crc/preset" + "github.com/crc-org/crc/v2/pkg/events" ) const startCancelTimeout = 15 * time.Second @@ -69,6 +70,10 @@ func (s *Synchronized) Delete() error { err := s.underlying.Delete() s.syncOperationDone <- Deleting + + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.NoVM}) + } return err } @@ -80,6 +85,7 @@ func (s *Synchronized) prepareStart(startCancel context.CancelFunc) error { } s.startCancel = startCancel s.currentState = Starting + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Starting}) return nil } @@ -92,6 +98,13 @@ func (s *Synchronized) Start(ctx context.Context, startConfig types.StartConfig) startResult, err := s.underlying.Start(ctx, startConfig) s.syncOperationDone <- Starting + + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: startResult.Status}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } + return startResult, err } @@ -136,10 +149,16 @@ func (s *Synchronized) Stop() (state.State, error) { if err := s.prepareStopDelete(Stopping); err != nil { return state.Error, err } + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopping}) st, err := s.underlying.Stop() s.syncOperationDone <- Stopping + if err == nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: st}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } return st, err } @@ -160,7 +179,14 @@ func (s *Synchronized) ConnectionDetails() (*types.ConnectionDetails, error) { } func (s *Synchronized) PowerOff() error { - return s.underlying.PowerOff() + err := s.underlying.PowerOff() + if err != nil { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopped}) + } else { + events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err}) + } + + return err } func (s *Synchronized) Status() (*types.ClusterStatusResult, error) { diff --git a/pkg/events/emitter.go b/pkg/events/emitter.go index 185af7d5f3..4b646fe18c 100644 --- a/pkg/events/emitter.go +++ b/pkg/events/emitter.go @@ -31,12 +31,14 @@ func NewEvent[T any]() Event[T] { func (e *event[T]) AddListener(listener Listener[T]) Disposable { e.eventMutex.Lock() + defer e.eventMutex.Unlock() + e.keyCounter++ key := e.keyCounter e.listeners[key] = &listenerWrapper[T]{ Listener: listener, } - e.eventMutex.Unlock() + return func() { e.eventMutex.Lock() defer e.eventMutex.Unlock() @@ -46,9 +48,7 @@ func (e *event[T]) AddListener(listener Listener[T]) Disposable { } func (e *event[T]) Fire(event T) { - e.eventMutex.Lock() - defer e.eventMutex.Unlock() for _, l := range e.listeners { - l.Listener(event) + go l.Listener(event) } } diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 0000000000..42a9865c7b --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,14 @@ +package events + +import ( + "github.com/crc-org/crc/v2/pkg/crc/machine/state" +) + +type StatusChangedEvent struct { + State state.State + Error error +} + +var ( + StatusChanged = NewEvent[StatusChangedEvent]() +)