Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update part deletion to announce on commbus #9

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ccli_config.DEFAULT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
## catalog address
server_addr: "http://localhost/api/graphql"
##
## commbus host
bus_host: "localhost"
## logging
## there are 2 log levels:
## 1 - debugger
Expand Down
16 changes: 0 additions & 16 deletions ccli_config.yml

This file was deleted.

54 changes: 50 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
module wrs/catalog/ccli

go 1.19
go 1.22.0

toolchain go1.22.4

require (
bitbucket.wrs.com/scm/weststar/graphql-upload-go.git v1.0.0-rc.0
github.com/google/uuid v1.4.0
github.com/google/uuid v1.6.0
github.com/hasura/go-graphql-client v0.9.2
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.8.0
Expand All @@ -13,10 +15,54 @@ require (
)

require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
github.com/AthenZ/athenz v1.10.39 // indirect
github.com/DataDog/zstd v1.5.0 // indirect
github.com/apache/pulsar-client-go v0.14.0 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.4.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang-jwt/jwt v3.2.1+incompatible // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/linkedin/goavro/v2 v2.9.8 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
golang.org/x/mod v0.18.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/term v0.19.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.34.1 // indirect
)

require (
bitbucket.wrs.com/scm/weststar/communication-bus.git v0.5.1
bitbucket.wrs.com/scm/weststar/pulsar-schemas-go.git v0.6.0
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
Expand All @@ -30,7 +76,7 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
nhooyr.io/websocket v1.8.7 // indirect
Expand Down
557 changes: 557 additions & 0 deletions go.sum

Large diffs are not rendered by default.

12 changes: 5 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,12 @@ func init() {
viper.AddConfigPath(".")
// read the config file
if err := viper.ReadInConfig(); err != nil {
if err != nil {
if err != errors.New("open ccli_config.yml: no such file or directory") {
fmt.Println("User configuration file not found. Please create ccli_config.yml and copy the contents of ccli_config.DEFAULT.yml.")
} else {
fmt.Println("Error reading in config file. Error:", err)
}
os.Exit(1)
if err != errors.New("open ccli_config.yml: no such file or directory") {
fmt.Println("User configuration file not found. Please create ccli_config.yml and copy the contents of ccli_config.DEFAULT.yml.")
} else {
fmt.Println("Error reading in config file. Error:", err)
}
os.Exit(1)
}
// unmarshal the config file parameters to a struct
if err := viper.Unmarshal(&configFile); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions packages/cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"log/slog"
"wrs/catalog/ccli/packages/config"
"wrs/catalog/ccli/packages/graphql"
"wrs/catalog/ccli/packages/pulsar"

pb "bitbucket.wrs.com/scm/weststar/pulsar-schemas-go.git"
graph "github.com/hasura/go-graphql-client"

"github.com/pkg/errors"
Expand Down Expand Up @@ -55,6 +57,13 @@ func Delete(configFile *config.ConfigData, client *graph.Client, indent string)
if err := graphql.DeletePart(context.Background(), client, argPartID, argRecursiveMode, argForcedMode); err != nil {
return errors.Wrapf(err, "error deleting part from catalog")
}
pulsarProducer, err := pulsar.NewPulsarPartProducer(nil, configFile.BusHost, "persistent://public/proto/part")
if err != nil {
return err
}
if err := pulsarProducer.SendPartSchemaValue(pb.PartAction_DELETE, argPartID); err != nil {
return errors.Wrapf(err, "error announcing part deletion")
}
fmt.Printf("Successfully deleted id: %s from catalog\n", argPartID)
}
return nil
Expand Down
1 change: 1 addition & 0 deletions packages/config/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ConfigData struct {
LogFile string `mapstructure:"log_file"`
LogLevel int64 `mapstructure:"log_level"`
JsonIndent int64 `mapstructure:"json_indent"`
BusHost string `mapstructure:"bus_host"`
}

// struct for storing io.writer
Expand Down
75 changes: 75 additions & 0 deletions packages/pulsar/struct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package pulsar

import (
"log/slog"

bus "bitbucket.wrs.com/scm/weststar/communication-bus.git"
pulsar "bitbucket.wrs.com/scm/weststar/communication-bus.git/pulsar"
pb "bitbucket.wrs.com/scm/weststar/pulsar-schemas-go.git"
apachepulsar "github.com/apache/pulsar-client-go/pulsar"
"github.com/google/uuid"
"github.com/pkg/errors"
)

// PulsarPartProducer handles parts and puts them onto part bus topic
type PulsarPartProducer struct {
logger *slog.Logger
partProducer bus.ProtoProducer[*pb.PartMessage]
}

func NewPulsarPartProducer(logger *slog.Logger, host string, partTopic string) (*PulsarPartProducer, error) {
ret := new(PulsarPartProducer)
if logger != nil {
ret.logger = logger
} else {
ret.logger = slog.Default()
}

// Create Pulsar Client
client, err := pulsar.NewPulsarClient(nil, host, 6650, "", false)
if err != nil {
return nil, errors.Wrapf(err, "could not instantiate Pulsar client")
}

partProducer, err := pulsar.NewPulsarProtoProducer[*pb.PartMessage](client, partTopic)
if err != nil {
return nil, errors.Wrapf(err, "could not create proto product line producer")

}
ret.partProducer = partProducer

ret.logger.Info("Created Pulsar Part Producer", slog.String("host", host), slog.String("topic", partTopic))

return ret, nil
}

// SendSchemaValue sends a Protobuf message onto the part topic
func (partProducer *PulsarPartProducer) SendPartSchemaValue(actionType pb.PartAction, partID string) error {

partUUID, err := uuid.Parse(partID)
if err != nil {
return err
}
id, err := partUUID.MarshalBinary()
if err != nil {
return err
}
value := pb.PartMessage{
Part: &pb.Part{
Id: id,
},
Action: actionType,
}

// Send to part topic
if messageID, err := partProducer.partProducer.Send(&value); err != nil {
return errors.Wrapf(err, "error producing part message")
} else {
switch t := messageID.(type) {
case apachepulsar.MessageID:
partProducer.logger.Debug("produced part message", slog.String("messageID", string(t.Serialize())))
}
}

return nil
}