Skip to content

Commit

Permalink
Multidirectional pub and sub
Browse files Browse the repository at this point in the history
  • Loading branch information
antoniomika committed Sep 10, 2024
1 parent 8d55c02 commit 8f72eba
Show file tree
Hide file tree
Showing 7 changed files with 388 additions and 167 deletions.
16 changes: 16 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/authorized_keys/main.go",
"cwd": "${workspaceFolder}"
}
]
}
69 changes: 43 additions & 26 deletions cmd/authorized_keys/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (
"log/slog"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"

"github.com/antoniomika/syncmap"
"github.com/charmbracelet/ssh"
"github.com/charmbracelet/wish"
"github.com/google/uuid"
"github.com/picosh/pubsub"
)

Expand All @@ -32,7 +35,6 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
return
}

ctx := sesh.Context()
cmd := strings.TrimSpace(args[0])
channel := args[1]
logger := cfg.Logger.With(
Expand All @@ -45,38 +47,32 @@ func PubSubMiddleware(cfg *pubsub.Cfg) wish.Middleware {
if cmd == "help" {
wish.Println(sesh, "USAGE: ssh send.pico.sh (sub|pub) {channel}")
} else if cmd == "sub" {
sub := &pubsub.Subscriber{
Name: channel,
sub := &pubsub.Sub{
ID: uuid.NewString(),
Writer: sesh,
Chan: make(chan error),
Done: make(chan struct{}),
Data: make(chan []byte),
}

go func() {
<-ctx.Done()
err := cfg.PubSub.UnSub(sub)
if err != nil {
wish.Errorln(sesh, err)
}
<-sesh.Context().Done()
sub.Cleanup()
}()
err := cfg.PubSub.Sub(sub)
if err != nil {
wish.Errorln(sesh, err)
}

cfg.PubSub.Sub(channel, sub)

Check failure on line 62 in cmd/authorized_keys/main.go

View workflow job for this annotation

GitHub Actions / test

Error return value of `cfg.PubSub.Sub` is not checked (errcheck)
} else if cmd == "pub" {
msg := &pubsub.Msg{
Name: channel,
pub := &pubsub.Pub{
ID: uuid.NewString(),
Done: make(chan struct{}),
Reader: sesh,
}

go func() {
<-ctx.Done()
err := cfg.PubSub.UnPub(msg)
if err != nil {
wish.Errorln(sesh, err)
}
<-sesh.Context().Done()
pub.Cleanup()
}()
err := cfg.PubSub.Pub(msg)
if err != nil {
wish.Errorln(sesh, err)
}

cfg.PubSub.Pub(channel, pub)

Check failure on line 75 in cmd/authorized_keys/main.go

View workflow job for this annotation

GitHub Actions / test

Error return value of `cfg.PubSub.Pub` is not checked (errcheck)
} else {
wish.Println(sesh, "USAGE: ssh send.pico.sh (sub|pub) {channel}")
}
Expand All @@ -94,12 +90,13 @@ func main() {
cfg := &pubsub.Cfg{
Logger: logger,
PubSub: &pubsub.PubSubMulticast{
Logger: logger,
Chan: make(chan *pubsub.Subscriber),
Logger: logger,
Channels: syncmap.New[string, *pubsub.Channel](),
},
}

s, err := wish.NewServer(
ssh.NoPty(),
wish.WithAddress(fmt.Sprintf("%s:%s", host, port)),
wish.WithHostKeyPath("ssh_data/term_info_ed25519"),
wish.WithAuthorizedKeys(keyPath),
Expand All @@ -125,6 +122,26 @@ func main() {
}
}()

go func() {
for {
slog.Info("Debug Info", slog.Int("goroutines", runtime.NumGoroutine()))
select {
case <-time.After(5 * time.Second):
for _, channel := range cfg.PubSub.GetChannels() {
slog.Info("channel online", slog.Any("channel", channel.Name))
for _, pub := range cfg.PubSub.GetPubs(channel.Name) {
slog.Info("pub online", slog.Any("channel", channel.Name), slog.Any("pub", pub.ID))
}
for _, sub := range cfg.PubSub.GetSubs(channel.Name) {
slog.Info("sub online", slog.Any("channel", channel.Name), slog.Any("sub", sub.ID))
}
}
case <-done:
return
}
}
}()

<-done
logger.Info("stopping SSH server")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (

require (
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
github.com/antoniomika/syncmap v1.0.0 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/charmbracelet/bubbletea v0.27.0 // indirect
github.com/charmbracelet/keygen v0.5.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/antoniomika/syncmap v1.0.0 h1:iFSfbQFQOvHZILFZF+hqWosO0no+W9+uF4y2VEyMKWU=
github.com/antoniomika/syncmap v1.0.0/go.mod h1:fK2829foEYnO4riNfyUn0SHQZt4ue3DStYjGU+sJj38=
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
github.com/charmbracelet/bubbletea v0.27.0 h1:Mznj+vvYuYagD9Pn2mY7fuelGvP0HAXtZYGgRBCbHvU=
Expand Down
Loading

0 comments on commit 8f72eba

Please sign in to comment.