1
0
Fork 0

New chain processor

This commit is contained in:
Gregory Eremin 2015-02-11 13:33:15 +07:00
parent 30c8711798
commit 500669b2ab
3 changed files with 70 additions and 144 deletions

View File

@ -1,5 +1,10 @@
package core
import (
"bytes"
"sync"
)
type (
Chain struct {
Flow Flow
@ -11,8 +16,21 @@ type (
}
)
const (
LF = byte(10)
)
func (c *Chain) Call(data []byte) (resp []byte, err error) {
return data, nil
switch c.Flow {
case SequentialFlow:
return c.processSequentially(data)
case ParallelFlow:
return c.processInParallel(data)
case DelayedFlow:
return c.processDelayed(data)
default:
panic("Unreachable")
}
}
func (c *Chain) Units() []string {
@ -32,3 +50,52 @@ func (c *Chain) Units() []string {
return uniq
}
func (c *Chain) processSequentially(data []byte) (resp []byte, err error) {
for _, caller := range c.Links {
if resp, err = caller.Call(data); err != nil {
return
}
}
return
}
func (c *Chain) processInParallel(data []byte) (resp []byte, err error) {
var (
inbox = make(chan []byte) // This channel must be unbuffered
buf bytes.Buffer
wg sync.WaitGroup
)
for _, caller := range c.Links {
wg.Add(1)
go func() {
if res, err := caller.Call(data); err == nil {
inbox <- res
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(inbox)
}()
for {
if res, ok := <-inbox; ok {
buf.Write(res)
buf.WriteByte(LF) // Add linebreak
} else {
break
}
}
return buf.Bytes(), nil
}
func (c *Chain) processDelayed(data []byte) (resp []byte, err error) {
for _, caller := range c.Links {
go caller.Call(data)
}
return data, nil
}

View File

@ -1,142 +0,0 @@
package core
import (
"encoding/json"
"io/ioutil"
"reflect"
"sync"
"code.google.com/p/go.net/context"
)
func NewChain(name string) (chain interface{}, ok bool) {
chains := readChains()
chain, ok = chains[name]
return
}
func ProcessChain(ctx context.Context, chain interface{}) context.Context {
ctx = processSequentialChain(ctx, []interface{}{chain})
return ctx
}
func processSequentialChain(ctx context.Context, chain []interface{}) context.Context {
for _, link := range chain {
val := reflect.ValueOf(link)
switch val.Kind() {
case reflect.Map:
lmap := link.(map[string]interface{})
if subchain, ok := lmap["s"]; ok {
ctx = processSequentialChain(ctx, subchain.([]interface{}))
} else if subchain, ok := lmap["p"]; ok {
ctx = processParallelChain(ctx, subchain.([]interface{}))
} else if subchain, ok := lmap["d"]; ok {
go processSequentialChain(ctx, subchain.([]interface{}))
}
case reflect.String:
unitName := val.String()
unit, ok := Units[unitName]
if !ok {
panic("Unknown unit: " + unitName)
}
// Execution of a unit
ctx = unit(ctx)
default:
panic("Unexpected chain element: " + val.Kind().String())
}
}
return ctx
}
func processParallelChain(ctx context.Context, chain []interface{}) context.Context {
var (
wg sync.WaitGroup
contexts = make(chan context.Context, len(chain))
finished = make(chan struct{})
)
for _, link := range chain {
val := reflect.ValueOf(link)
switch val.Kind() {
case reflect.Map:
lmap := link.(map[string]interface{})
if subchain, ok := lmap["s"]; ok {
wg.Add(1)
go func(ctx context.Context) {
contexts <- processSequentialChain(ctx, subchain.([]interface{}))
wg.Done()
}(ctx)
} else if subchain, ok := lmap["p"]; ok {
wg.Add(1)
go func(ctx context.Context) {
contexts <- processParallelChain(ctx, subchain.([]interface{}))
wg.Done()
}(ctx)
} else if subchain, ok := lmap["d"]; ok {
go processSequentialChain(ctx, subchain.([]interface{}))
}
case reflect.String:
unitName := val.String()
unit, ok := Units[unitName]
if !ok {
panic("Unknown unit: " + unitName)
}
// Execution of a unit
wg.Add(1)
go func(ctx context.Context) {
contexts <- unit(ctx)
wg.Done()
}(ctx)
default:
panic("Unexpected chain element: " + val.Kind().String())
}
}
go func() {
wg.Wait()
finished <- struct{}{}
}()
ctx = context.WithValue(ctx, "parallel_contexts", contexts)
ctx = context.WithValue(ctx, "parallel_finished", finished)
return ctx
}
func readChains() map[string]interface{} {
b, err := ioutil.ReadFile("config/chains.json")
if err != nil {
panic(err)
}
var chains map[string]interface{}
if err := json.Unmarshal(b, &chains); err != nil {
panic(err)
}
return chains
}
func SyncronizeParallelChain(ctx context.Context, fn func(context.Context)) {
var (
pcontexts = ctx.Value("parallel_contexts").(chan context.Context)
pfinished = ctx.Value("parallel_finished").(chan struct{})
working = true
)
for len(pcontexts) > 0 || working {
select {
case pctx := <-pcontexts:
fn(pctx)
case <-pfinished:
working = false
}
}
}

View File

@ -21,7 +21,8 @@ func (s *Supervisor) StartAll() {
func (s *Supervisor) Start(unit string) {
fmt.Println("Starting unit: " + unit)
cmd := exec.Command(Conf().Python.BinPath, Conf().Python.WrapperPath, unit)
conf := Conf().Python
cmd := exec.Command(conf.BinPath, conf.WrapperPath, unit)
cmd.Stdout = os.Stdout // Sorry
if err := cmd.Start(); err != nil {
fmt.Println("Failed to start unit: ", unit)