From 500669b2ab38b4bf649e3f5419943ef84863952a Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Wed, 11 Feb 2015 13:33:15 +0700 Subject: [PATCH] New chain processor --- core/chain.go | 69 ++++++++++++++++++- core/chain_processor.go | 142 ---------------------------------------- core/supervisor.go | 3 +- 3 files changed, 70 insertions(+), 144 deletions(-) delete mode 100644 core/chain_processor.go diff --git a/core/chain.go b/core/chain.go index 0f1e0ee..80977ea 100644 --- a/core/chain.go +++ b/core/chain.go @@ -1,5 +1,10 @@ package core +import ( + "bytes" + "sync" +) + type ( Chain struct { Flow Flow @@ -11,8 +16,21 @@ type ( } ) +const ( + LF = byte(10) +) + func (c *Chain) Call(data []byte) (resp []byte, err error) { - return data, nil + switch c.Flow { + case SequentialFlow: + return c.processSequentially(data) + case ParallelFlow: + return c.processInParallel(data) + case DelayedFlow: + return c.processDelayed(data) + default: + panic("Unreachable") + } } func (c *Chain) Units() []string { @@ -32,3 +50,52 @@ func (c *Chain) Units() []string { return uniq } + +func (c *Chain) processSequentially(data []byte) (resp []byte, err error) { + for _, caller := range c.Links { + if resp, err = caller.Call(data); err != nil { + return + } + } + return +} + +func (c *Chain) processInParallel(data []byte) (resp []byte, err error) { + var ( + inbox = make(chan []byte) // This channel must be unbuffered + buf bytes.Buffer + wg sync.WaitGroup + ) + + for _, caller := range c.Links { + wg.Add(1) + go func() { + if res, err := caller.Call(data); err == nil { + inbox <- res + } + wg.Done() + }() + } + go func() { + wg.Wait() + close(inbox) + }() + + for { + if res, ok := <-inbox; ok { + buf.Write(res) + buf.WriteByte(LF) // Add linebreak + } else { + break + } + } + + return buf.Bytes(), nil +} + +func (c *Chain) processDelayed(data []byte) (resp []byte, err error) { + for _, caller := range c.Links { + go caller.Call(data) + } + return data, nil +} diff --git a/core/chain_processor.go b/core/chain_processor.go deleted file mode 100644 index 5544785..0000000 --- a/core/chain_processor.go +++ /dev/null @@ -1,142 +0,0 @@ -package core - -import ( - "encoding/json" - "io/ioutil" - "reflect" - "sync" - - "code.google.com/p/go.net/context" -) - -func NewChain(name string) (chain interface{}, ok bool) { - chains := readChains() - chain, ok = chains[name] - - return -} - -func ProcessChain(ctx context.Context, chain interface{}) context.Context { - ctx = processSequentialChain(ctx, []interface{}{chain}) - - return ctx -} - -func processSequentialChain(ctx context.Context, chain []interface{}) context.Context { - for _, link := range chain { - val := reflect.ValueOf(link) - - switch val.Kind() { - case reflect.Map: - lmap := link.(map[string]interface{}) - if subchain, ok := lmap["s"]; ok { - ctx = processSequentialChain(ctx, subchain.([]interface{})) - } else if subchain, ok := lmap["p"]; ok { - ctx = processParallelChain(ctx, subchain.([]interface{})) - } else if subchain, ok := lmap["d"]; ok { - go processSequentialChain(ctx, subchain.([]interface{})) - } - case reflect.String: - unitName := val.String() - unit, ok := Units[unitName] - if !ok { - panic("Unknown unit: " + unitName) - } - - // Execution of a unit - ctx = unit(ctx) - default: - panic("Unexpected chain element: " + val.Kind().String()) - } - } - - return ctx -} - -func processParallelChain(ctx context.Context, chain []interface{}) context.Context { - var ( - wg sync.WaitGroup - contexts = make(chan context.Context, len(chain)) - finished = make(chan struct{}) - ) - - for _, link := range chain { - val := reflect.ValueOf(link) - - switch val.Kind() { - case reflect.Map: - lmap := link.(map[string]interface{}) - if subchain, ok := lmap["s"]; ok { - wg.Add(1) - go func(ctx context.Context) { - contexts <- processSequentialChain(ctx, subchain.([]interface{})) - wg.Done() - }(ctx) - } else if subchain, ok := lmap["p"]; ok { - wg.Add(1) - go func(ctx context.Context) { - contexts <- processParallelChain(ctx, subchain.([]interface{})) - wg.Done() - }(ctx) - } else if subchain, ok := lmap["d"]; ok { - go processSequentialChain(ctx, subchain.([]interface{})) - } - case reflect.String: - unitName := val.String() - unit, ok := Units[unitName] - if !ok { - panic("Unknown unit: " + unitName) - } - - // Execution of a unit - wg.Add(1) - go func(ctx context.Context) { - contexts <- unit(ctx) - wg.Done() - }(ctx) - default: - panic("Unexpected chain element: " + val.Kind().String()) - } - } - - go func() { - wg.Wait() - finished <- struct{}{} - }() - - ctx = context.WithValue(ctx, "parallel_contexts", contexts) - ctx = context.WithValue(ctx, "parallel_finished", finished) - - return ctx -} - -func readChains() map[string]interface{} { - b, err := ioutil.ReadFile("config/chains.json") - if err != nil { - panic(err) - } - - var chains map[string]interface{} - if err := json.Unmarshal(b, &chains); err != nil { - panic(err) - } - - return chains -} - -func SyncronizeParallelChain(ctx context.Context, fn func(context.Context)) { - var ( - pcontexts = ctx.Value("parallel_contexts").(chan context.Context) - pfinished = ctx.Value("parallel_finished").(chan struct{}) - working = true - ) - - for len(pcontexts) > 0 || working { - select { - case pctx := <-pcontexts: - fn(pctx) - case <-pfinished: - working = false - } - } -} diff --git a/core/supervisor.go b/core/supervisor.go index c63f422..f019c5f 100644 --- a/core/supervisor.go +++ b/core/supervisor.go @@ -21,7 +21,8 @@ func (s *Supervisor) StartAll() { func (s *Supervisor) Start(unit string) { fmt.Println("Starting unit: " + unit) - cmd := exec.Command(Conf().Python.BinPath, Conf().Python.WrapperPath, unit) + conf := Conf().Python + cmd := exec.Command(conf.BinPath, conf.WrapperPath, unit) cmd.Stdout = os.Stdout // Sorry if err := cmd.Start(); err != nil { fmt.Println("Failed to start unit: ", unit)