Skip to content

Commit

Permalink
Merge pull request #2 from RyouZhang/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
RyouZhang authored Jul 11, 2024
2 parents bf64af6 + 509b413 commit bb8da63
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 38 deletions.
7 changes: 7 additions & 0 deletions base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flow

type IBrick interface {
Name() string
AddLifeCycle(l ILifeCycle)
}

type IInput interface {
Expand All @@ -24,3 +25,9 @@ type IError interface {
type IRoute interface {
RouteOutput(func(*Message) bool) <-chan *Message
}

type ILifeCycle interface {
Add(int)
Done()
Wait()
}
9 changes: 8 additions & 1 deletion base_brick.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
)

type BaseBrick struct {
name string
name string
lc ILifeCycle

kernal func(*Message) (*Message, error)
errQueue chan error
outQueue chan *Message
Expand All @@ -15,6 +17,10 @@ func (b *BaseBrick) Name() string {
return b.name
}

func (b *BaseBrick) AddLifeCycle(lc ILifeCycle) {
b.lc = lc
}

func (b *BaseBrick) Linked(inQueue <-chan *Message) {
b.loop(inQueue)
}
Expand All @@ -31,6 +37,7 @@ func (b *BaseBrick) loop(inQueue <-chan *Message) {
defer func() {
close(b.errQueue)
close(b.outQueue)
b.lc.Done()
}()
for msg := range inQueue {
res, err := async.Safety(func() (interface{}, error) {
Expand Down
43 changes: 19 additions & 24 deletions board.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,27 @@ import (

type Board struct {
name string
wg sync.WaitGroup
bricks map[string]IBrick
errHandler func(string, error)
lc ILifeCycle
}

func NewBoard(name string) *Board {
return &Board{
name: name,
bricks: make(map[string]IBrick),
lc: &sync.WaitGroup{},
}
}

func NewBoardWithLifeCycle(name string, lc ILifeCycle) *Board {
if lc == nil {
return nil
}
return &Board{
name: name,
bricks: make(map[string]IBrick),
lc: lc,
}
}

Expand All @@ -29,6 +41,9 @@ func (b *Board) Add(bricks ...IBrick) *Board {
_, ok := b.bricks[brick.Name()]
if false == ok {
b.bricks[brick.Name()] = brick
// add life cycle
b.lc.Add(1)
brick.AddLifeCycle(b.lc)

if _, ok := brick.(IError); ok {
go b.onError(brick.(IBrick).Name(), brick.(IError).Errors())
Expand All @@ -49,19 +64,10 @@ func (b *Board) Connect(outName string, inName string) *Board {
if false == ok {
panic(errors.New(fmt.Sprintf("Invalid Brick %s", inName)))
}
b.connectBrick(out.(IOutput), in.(IInput))
in.(IInput).Linked(out.(IOutput).Output())
return b
}

func (b *Board) connectBrick(out IOutput, in IInput) {
b.wg.Add(1)
target := out.Output()
go func() {
defer b.wg.Done()
in.Linked(target)
}()
}

func (b *Board) RouteConnect(outName string, inName string, method func(*Message) bool) *Board {
out, ok := b.bricks[outName]
if false == ok {
Expand All @@ -71,26 +77,15 @@ func (b *Board) RouteConnect(outName string, inName string, method func(*Message
if false == ok {
panic(errors.New(fmt.Sprintf("Invalid Brick %s", inName)))
}
b.routeConnectBrick(out.(IRoute), in.(IInput), method)
in.(IInput).Linked(out.(IRoute).RouteOutput(method))
return b
}

func (b *Board) routeConnectBrick(out IRoute, in IInput, method func(*Message) bool) {
b.wg.Add(1)
target := out.RouteOutput(method)
go func() {
defer b.wg.Done()
in.Linked(target)
}()
}

func (b *Board) Start() {
for _, brick := range b.bricks {
ob, ok := brick.(IEntry)
if ok {
b.wg.Add(1)
go func() {
defer b.wg.Done()
ob.Start()
}()
}
Expand All @@ -104,7 +99,7 @@ func (b *Board) Stop() {
ob.Stop()
}
}
b.wg.Wait()
b.lc.Wait()
}

func (b *Board) onError(name string, inQueue <-chan error) {
Expand Down
11 changes: 10 additions & 1 deletion input_brick.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
)

type InputBrick struct {
name string
name string
lc ILifeCycle

kernal func(chan<- *Message, chan<- error, <-chan bool)
shutdown chan bool

Expand All @@ -21,6 +23,10 @@ func (b *InputBrick) Name() string {
return b.name
}

func (b *InputBrick) AddLifeCycle(lc ILifeCycle) {
b.lc = lc
}

func (b *InputBrick) Output() <-chan *Message {
b.useDefaultOut = true
return b.outQueue
Expand All @@ -44,6 +50,9 @@ func (b *InputBrick) Start() {
}

func (b *InputBrick) Stop() {
defer func() {
b.lc.Done()
}()
b.shutdown <- true
close(b.shutdown)
}
Expand Down
9 changes: 8 additions & 1 deletion logic_brick.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
)

type LogicBrick struct {
name string
name string
lc ILifeCycle

workers chan bool
wg sync.WaitGroup
kernal func(*Message, chan<- *Message) error
Expand All @@ -30,6 +32,10 @@ func (b *LogicBrick) Name() string {
return b.name
}

func (b *LogicBrick) AddLifeCycle(lc ILifeCycle) {
b.lc = lc
}

func (b *LogicBrick) Linked(queue <-chan *Message) {
b.inMux.Add(1)
go func() {
Expand Down Expand Up @@ -114,6 +120,7 @@ func (b *LogicBrick) pump() {
}
close(b.outQueue)
close(b.errQueue)
b.lc.Done()
}

func NewLogicBrick(
Expand Down
13 changes: 11 additions & 2 deletions merge_brick.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
)

type MergeBrick struct {
name string
name string
lc ILifeCycle

once sync.Once
wg sync.WaitGroup
outQueue chan *Message
Expand All @@ -15,14 +17,21 @@ func (b *MergeBrick) Name() string {
return b.name
}

func (b *MergeBrick) AddLifeCycle(lc ILifeCycle) {
b.lc = lc
}

func (b *MergeBrick) Output() <-chan *Message {
return b.outQueue
}

func (b *MergeBrick) Linked(inQueue <-chan *Message) {
b.wg.Add(1)
go func() {
defer b.wg.Done()
defer func() {
b.wg.Done()
b.lc.Done()
}()
for msg := range inQueue {
b.outQueue <- msg
}
Expand Down
11 changes: 9 additions & 2 deletions output_brick.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
)

type OutputBrick struct {
name string
name string
lc ILifeCycle

kernal func(*Message) error
gracefulStop func()
errQueue chan error
Expand All @@ -15,8 +17,12 @@ func (b *OutputBrick) Name() string {
return b.name
}

func (b *OutputBrick) AddLifeCycle(lc ILifeCycle) {
b.lc = lc
}

func (b *OutputBrick) Linked(inQueue <-chan *Message) {
b.loop(inQueue)
go b.loop(inQueue)
}

func (b *OutputBrick) Errors() <-chan error {
Expand All @@ -26,6 +32,7 @@ func (b *OutputBrick) Errors() <-chan error {
func (b *OutputBrick) loop(inQueue <-chan *Message) {
defer func() {
close(b.errQueue)
b.lc.Done()
}()
for msg := range inQueue {
_, err := async.Safety(func() (interface{}, error) {
Expand Down
9 changes: 8 additions & 1 deletion parallel_brick.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
)

type ParallelBrick struct {
name string
name string
lc ILifeCycle

hash func(*Message) int
subQueues []chan *Message
wg sync.WaitGroup
Expand All @@ -31,6 +33,10 @@ func (b *ParallelBrick) Name() string {
return b.name
}

func (b *ParallelBrick) AddLifeCycle(lc ILifeCycle) {
b.lc = lc
}

func (b *ParallelBrick) Linked(queue <-chan *Message) {
b.inMux.Add(1)
go func() {
Expand Down Expand Up @@ -107,6 +113,7 @@ func (b *ParallelBrick) pump() {
}
close(b.outQueue)
close(b.errQueue)
b.lc.Done()
}

func (b *ParallelBrick) worker(queue <-chan *Message) {
Expand Down
14 changes: 13 additions & 1 deletion priority_brick.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
)

type PriorityBrick struct {
name string
name string
lc ILifeCycle

workers chan bool
wg sync.WaitGroup
kernal func(*Message, chan<- *Message) error
Expand All @@ -31,6 +33,10 @@ func (b *PriorityBrick) Name() string {
return b.name
}

func (b *PriorityBrick) AddLifeCycle(lc ILifeCycle) {
b.lc = lc
}

func (b *PriorityBrick) Linked(queue <-chan *Message) {
b.inQueueMux.Lock()
defer b.inQueueMux.Unlock()
Expand Down Expand Up @@ -86,6 +92,9 @@ func (b *PriorityBrick) handler(queue <-chan *Message) error {
}

func (b *PriorityBrick) loop() {
defer func() {
close(b.resQueue)
}()
flagKey := 0
for {
PULL:
Expand Down Expand Up @@ -113,6 +122,9 @@ func (b *PriorityBrick) loop() {
}

func (b *PriorityBrick) pump() {
defer func() {
b.lc.Done()
}()
for msg := range b.resQueue {
flag := false
for _, item := range b.outQueues {
Expand Down
11 changes: 9 additions & 2 deletions reduce_brick.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
)

type ReduceBrick struct {
name string
name string
lc ILifeCycle

windowSeconds int //sec
msgKeys []string
msgDic map[string][]*Message
Expand All @@ -22,8 +24,12 @@ func (b *ReduceBrick) Name() string {
return b.name
}

func (b *ReduceBrick) AddLifeCycle(lc ILifeCycle) {
b.lc = lc
}

func (b *ReduceBrick) Linked(inQueue <-chan *Message) {
b.loop(inQueue)
go b.loop(inQueue)
}

func (b *ReduceBrick) Output() <-chan *Message {
Expand Down Expand Up @@ -130,6 +136,7 @@ End:
}
close(b.outQueue)
close(b.errQueue)
b.lc.Done()
}

func NewReduceBrick(
Expand Down
Loading

0 comments on commit bb8da63

Please sign in to comment.