Skip to content

Commit

Permalink
try to fix versions for go-mod according to go1.13
Browse files Browse the repository at this point in the history
  • Loading branch information
sheb-gregor committed Nov 19, 2019
1 parent 6d730a3 commit bd1c5b9
Show file tree
Hide file tree
Showing 23 changed files with 1,200 additions and 270 deletions.
360 changes: 214 additions & 146 deletions chief.go

Large diffs are not rendered by default.

71 changes: 65 additions & 6 deletions context.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,76 @@
package uwe

import "context"
import (
"context"
"fmt"
)

type Context interface {
type WorkerName string

type Message struct {
UID int64
Target WorkerName
Sender WorkerName
Data interface{}
}

type WContext interface {
context.Context
SendMessage(target WorkerName, data interface{}) error
MessageBus() <-chan *Message
}

type ctx struct {
type wContext struct {
context.Context

name WorkerName

// in is channel for incoming messages for a worker
in chan *Message
// out is channel for outgoing messages from a worker
out chan<- *Message
}

func NewContext(name WorkerName, ctx context.Context, in, out chan *Message) WContext {
return &wContext{
Context: ctx,
name: name,
in: in,
out: out,
}

}

func NewContext() Context {
return ctx{
Context: context.Background(),
func (wc *wContext) SendMessage(target WorkerName, data interface{}) error {
wc.out <- &Message{
UID: 0,
Target: target,
Sender: wc.name,
Data: data,
}
return nil
}

func (wc *wContext) MessageBus() <-chan *Message {
return wc.in
}

type execStatus string

const (
signalOk execStatus = "ok"
signalInterrupted execStatus = "interrupted"
signalFailure execStatus = "failure"
signalStop execStatus = "stop"
signalUnexpectedStop execStatus = "unexpected_stop"
)

type workerSignal struct {
name WorkerName
sig execStatus
msg string
}

func (s *workerSignal) Error() string {
return fmt.Sprintf("%s(%v); %s", s.name, s.sig, s.msg)
}
81 changes: 81 additions & 0 deletions examples/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"context"
"fmt"
"time"

"github.com/lancer-kit/uwe"
"github.com/sirupsen/logrus"
)

type dummy struct {
name string
logger *logrus.Entry
}

func (d dummy) Init(ctx context.Context) uwe.Worker {
logger, ok := ctx.Value(uwe.CtxKeyLog).(*logrus.Entry)
if !ok {
logger = logrus.NewEntry(logrus.New())
}
return &dummy{
name: d.name,
logger: logger.WithField("worker", d.name),
}
}

func (dummy) RestartOnFail() bool {
return true
}

func (d dummy) Run(wCtx uwe.WContext) uwe.ExitCode {
ticker := time.NewTicker(time.Second)

for {
select {
case <-ticker.C:
d.logger.Info("Perform my task")
switch d.name {
case "dummy-1":
_ = wCtx.SendMessage("dummy-2", "Hi, Johnny")
_ = wCtx.SendMessage("dummy-3", "Hi, Johnny")
case "dummy-2":
_ = wCtx.SendMessage("dummy-1", "Hi, Johnny")
_ = wCtx.SendMessage("dummy-3", "Hi, Johnny")
case "dummy-3":
_ = wCtx.SendMessage("dummy-1", "Hi, Johnny")
_ = wCtx.SendMessage("dummy-2", "Hi, Johnny")
}

case m := <-wCtx.MessageBus():
d.logger.
WithField("Sender", m.Sender).
WithField("Target", m.Target).
WithField("data", fmt.Sprintf("%+v", m.Data)).
Info("got new message")
case <-wCtx.Done():
ticker.Stop()
d.logger.Info("Receive exit code, stop working")
return uwe.ExitCodeOk
}
}

}

func main() {
chief := uwe.NewChief(
"chief",
true,
logrus.New().WithField("env", "example"),
)

chief.AddWorker("dummy-1", &dummy{name: "dummy-1"})
chief.AddWorker("dummy-2", &dummy{name: "dummy-2"})
chief.AddWorker("dummy-3", &dummy{name: "dummy-3"})

err := chief.Run("dummy-1", "dummy-2", "dummy-3")
if err != nil {
logrus.Fatal(err)
}
}
9 changes: 3 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
module github.com/lancer-kit/uwe/v2
module github.com/lancer-kit/uwe

go 1.12

require (
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-chi/chi v4.0.2+incompatible
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible
github.com/lancer-kit/sam v0.0.0-20190828205034-ab78e42fc7ce
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.4.0 // indirect
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.3.0 // indirect
)
22 changes: 10 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi v4.0.2+incompatible h1:maB6vn6FqCxrpz4FqWdh4+lwpyZIQS7YEAUcHlgXVRs=
github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible h1:msy24VGS42fKO9K1vLz82/GeYW1cILu7Nuuj1N3BBkE=
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/lancer-kit/sam v0.0.0-20190828205034-ab78e42fc7ce h1:GB+TZbq3MPukFjt6XtzWsNEqF/27lGkOHZeRyLGUkZg=
github.com/lancer-kit/sam v0.0.0-20190828205034-ab78e42fc7ce/go.mod h1:dJSKzw9vZqK0nwXplFhVC97C0/TRg7soUQzCd5GtloY=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
29 changes: 29 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package uwe

import "context"

// Worker is an interface for async workers
// which launches and manages by the `Chief`.
type Worker interface {
// Init initializes new instance of the `Worker` implementation,
// this context should be used only as Key/Value transmitter,
// DO NOT use it for `<- ctx.Done()`
Init(ctx context.Context) Worker
// RestartOnFail determines the need to restart the worker, if it stopped.
RestartOnFail() bool
// Run starts the `Worker` instance execution.
Run(ctx WContext) ExitCode
}

type ExitCode int

const (
// ExitCodeOk means that the worker is stopped.
ExitCodeOk ExitCode = iota
// ExitCodeInterrupted means that the work cycle has been interrupted and can be restarted.
ExitCodeInterrupted
// ExitCodeFailed means that the worker fails.
ExitCodeFailed
// ExitNeedReInit means that the worker can't do job and requires reinitialization.
ExitReinitReq
)
Loading

0 comments on commit bd1c5b9

Please sign in to comment.