Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiple instances support for grpc broker. #235

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ jobs:
make e2e-test
env:
container_tool: docker
SERVER_REPLICAS: 2
MESSAGE_DRIVER_TYPE: grpc
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ test/e2e/.consumer_name
test/e2e/.external_host_ip
test/e2e/report/*
unit-test-results.json
integration-test-results.json
*integration-test-results.json
test/e2e/setup/aro/aro-hcp
17 changes: 13 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18

# Test output files
unit_test_json_output ?= ${PWD}/unit-test-results.json
integration_test_json_output ?= ${PWD}/integration-test-results.json
mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json
grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json

# Prints a list of useful targets.
help:
Expand Down Expand Up @@ -218,11 +219,19 @@ test:
# make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc.
# make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet
# make test-integration TESTFLAGS="-short" skips long-run tests
test-integration:
OCM_ENV=testing gotestsum --jsonfile-timing-events=$(integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
test-integration: test-integration-mqtt test-integration-grpc
.PHONY: test-integration

test-integration-mqtt:
BROKER=mqtt OCM_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
.PHONY: test-integration-mqtt

test-integration-grpc:
BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \
./test/integration
.PHONY: test-integration-grpc

# Regenerate openapi client and models
generate:
rm -rf pkg/api/openapi
Expand Down
7 changes: 6 additions & 1 deletion cmd/maestro/servecmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"github.com/openshift-online/maestro/cmd/maestro/environments"
"github.com/openshift-online/maestro/cmd/maestro/server"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/controllers"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"
"github.com/openshift-online/maestro/pkg/dispatcher"
"github.com/openshift-online/maestro/pkg/event"
)
Expand Down Expand Up @@ -47,9 +49,11 @@ func runServer(cmd *cobra.Command, args []string) {
// For gRPC, create a gRPC broker to handle resource spec and status events.
// For MQTT/Kafka, create a message queue based event server to handle resource spec and status events.
var eventServer server.EventServer
var eventFilter controllers.EventFilter
if environments.Environment().Config.MessageBroker.MessageBrokerType == "grpc" {
klog.Info("Setting up grpc broker")
eventServer = server.NewGRPCBroker(eventBroadcaster)
eventFilter = controllers.NewPredicatedEventFilter(eventServer.PredicateEvent)
} else {
klog.Info("Setting up message queue event server")
var statusDispatcher dispatcher.Dispatcher
Expand All @@ -67,12 +71,13 @@ func runServer(cmd *cobra.Command, args []string) {
// Set the status dispatcher for the healthcheck server
healthcheckServer.SetStatusDispatcher(statusDispatcher)
eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher)
eventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory))
}

// Create the servers
apiserver := server.NewAPIServer(eventBroadcaster)
metricsServer := server.NewMetricsServer()
controllersServer := server.NewControllersServer(eventServer)
controllersServer := server.NewControllersServer(eventServer, eventFilter)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
4 changes: 2 additions & 2 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"github.com/openshift-online/maestro/pkg/logger"
)

func NewControllersServer(eventServer EventServer) *ControllersServer {
func NewControllersServer(eventServer EventServer, eventFilter controllers.EventFilter) *ControllersServer {
s := &ControllersServer{
KindControllerManager: controllers.NewKindControllerManager(
db.NewAdvisoryLockFactory(env().Database.SessionFactory),
eventFilter,
env().Services.Events(),
),
StatusController: controllers.NewStatusController(
Expand Down
8 changes: 8 additions & 0 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type EventServer interface {

// OnStatusUpdate handles status update events for a resource.
OnStatusUpdate(ctx context.Context, eventID, resourceID string) error

// returns true if the event should be processed by the current instance, otherwise false and an error.
PredicateEvent(ctx context.Context, eventID string) (bool, error)
}

var _ EventServer = &MessageQueueEventServer{}
Expand Down Expand Up @@ -145,6 +148,11 @@ func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, r
)
}

// EventPredicate for the message queue event server is no-op, as the message queue server filter event based on lock.
func (s *MessageQueueEventServer) PredicateEvent(ctx context.Context, eventID string) (bool, error) {
return true, nil
}

// handleStatusUpdate processes the resource status update from the agent.
// The resource argument contains the updated status.
// The function performs the following steps:
Expand Down
96 changes: 76 additions & 20 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ import (
"github.com/openshift-online/maestro/pkg/services"
)

type resourceHandler func(res *api.Resource) error
// resourceHandler processes a resource spec by encoding it to a CloudEvent and sending it to the subscriber.
// It returns a bool indicating if the connection is closed and an error if one occurs.
// - Returns true and an error if the connection is closed.
// - Returns false and an error if encoding fails.
// - Returns false and nil if successful.
type resourceHandler func(res *api.Resource) (bool, error)

// subscriber defines a subscriber that can receive and handle resource spec.
type subscriber struct {
Expand All @@ -51,6 +56,7 @@ type GRPCBroker struct {
instanceID string
eventInstanceDao dao.EventInstanceDao
resourceService services.ResourceService
eventService services.EventService
statusEventService services.StatusEventService
bindAddress string
subscribers map[string]*subscriber // registered subscribers
Expand Down Expand Up @@ -79,6 +85,7 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer {
instanceID: env().Config.MessageBroker.ClientID,
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
resourceService: env().Services.Resources(),
eventService: env().Services.Events(),
statusEventService: env().Services.StatusEvents(),
bindAddress: env().Config.HTTPServer.Hostname + ":" + config.BrokerBindPort,
subscribers: make(map[string]*subscriber),
Expand Down Expand Up @@ -180,35 +187,55 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
return fmt.Errorf("invalid subscription request: missing cluster name")
}
// register the cluster for subscription to the resource spec
subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) error {
subscriberID, errChan := bkr.register(subReq.ClusterName, func(res *api.Resource) (bool, error) {
evt, err := encodeResourceSpec(res)
if err != nil {
return fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err)
// return the error to requeue the event if encoding fails (e.g., due to invalid resource spec).
return false, fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err)
}

klog.V(4).Infof("send the event to spec subscribers, %s", evt)

// WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf
pbEvt := &pbv1.CloudEvent{}
if err = grpcprotocol.WritePBMessage(context.TODO(), binding.ToMessage(evt), pbEvt); err != nil {
return fmt.Errorf("failed to convert cloudevent to protobuf: %v", err)
// return the error to requeue the event if converting to protobuf fails (e.g., due to invalid cloudevent).
return false, fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", res.ID, err)
}

// send the cloudevent to the subscriber
// TODO: error handling to address errors beyond network issues.
klog.V(4).Infof("sending the event to spec subscribers, %s", evt)
if err := subServer.Send(pbEvt); err != nil {
klog.Errorf("failed to send grpc event, %v", err)
// Return true to ensure the subscriber will be unregistered when sending fails, which will close the subserver stream.
// See: https://github.com/grpc/grpc-go/blob/b615b35c4feb932a0ac658fb86b7127f10ef664e/stream.go#L1537 for more details.
// Return the error without wrapping, as it contains the gRPC error code and message for future (TODO) handling beyond network issues.
// This will also not requeue the event, as the error will cause the connection to the subscriber to be closed.
// If the subscriber (agent) reconnects, rely on the agent's resync to retrieve the missing resource spec.
return true, err
}

return nil
return false, nil
})

select {
case err := <-errChan:
// An error occurred while sending the event to the subscriber.
// This could be due to multiple reasons:
// see: https://grpc.io/docs/guides/error/
// 1. general errors such as: deadline exceeded before return the response.
// 2. network errors such as: connection closed by intermidiate proxy.
// 3. protocol errors such as: compression error or flow control error.
// In all above cases, unregister the subscriber.
// TODO: unregister the subscriber if the error is a network error and the connection could be re-established.
klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err)
bkr.unregister(subscriberID)
return err
case <-subServer.Context().Done():
// The context of the stream has been canceled or completed.
// This could happen if:
// - The client closed the connection or canceled the stream.
// - The server closed the stream, potentially due to a shutdown.
// Regardless of the reason, unregister the subscriber and stop processing.
// No error is returned here because the stream closure is expected.
bkr.unregister(subscriberID)
return nil
}
Expand Down Expand Up @@ -377,16 +404,23 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
}

// handleRes publish the resource to the correct subscriber.
func (bkr *GRPCBroker) handleRes(resource *api.Resource) {
func (bkr *GRPCBroker) handleRes(resource *api.Resource) error {
bkr.mu.RLock()
defer bkr.mu.RUnlock()
for _, subscriber := range bkr.subscribers {
if subscriber.clusterName == resource.ConsumerName {
if err := subscriber.handler(resource); err != nil {
subscriber.errChan <- err
if isConnClosed, err := subscriber.handler(resource); err != nil {
if isConnClosed {
// if the connection is closed, write the error to the subscriber's error channel
// to ensure the subscriber is unregistered
subscriber.errChan <- err
}
// return the error to requeue the event if handling fails.
return err
}
}
}
return nil
}

// OnCreate is called by the controller when a resource is created on the maestro server.
Expand All @@ -396,9 +430,7 @@ func (bkr *GRPCBroker) OnCreate(ctx context.Context, id string) error {
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleRes(resource)
}

// OnUpdate is called by the controller when a resource is updated on the maestro server.
Expand All @@ -408,9 +440,7 @@ func (bkr *GRPCBroker) OnUpdate(ctx context.Context, id string) error {
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleRes(resource)
}

// OnDelete is called by the controller when a resource is deleted from the maestro server.
Expand All @@ -420,9 +450,7 @@ func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error {
return err
}

bkr.handleRes(resource)

return nil
return bkr.handleRes(resource)
}

// On StatusUpdate will be called on each new status event inserted into db.
Expand All @@ -442,6 +470,34 @@ func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID s
)
}

// PredicateEvent checks if the event should be processed by the current instance
// by verifying the resource consumer name is in the subscriber list, ensuring the
// event will be only processed when the consumer is subscribed to the current broker.
func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool, error) {
evt, err := bkr.eventService.Get(ctx, eventID)
if err != nil {
return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error())
}

// fast return if the event is already reconciled
if evt.ReconciledDate != nil {
return false, nil
}

resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID)
if svcErr != nil {
// if the resource is not found, it indicates the resource has been handled by other instances.
if svcErr.Is404() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a log here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log added.

klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID)
return false, nil
}
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error())
}

// check if the consumer is subscribed to the broker
return bkr.IsConsumerSubscribed(resource.ConsumerName), nil
}

// IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec.
func (bkr *GRPCBroker) IsConsumerSubscribed(consumerName string) bool {
bkr.mu.RLock()
Expand Down
Loading