commit 509f281f72500ef3b7738c2d05c7c92b82e59099 Author: Gregory Eremin Date: Sun Aug 24 19:56:55 2014 +0700 Initial commit diff --git a/calculate.go b/calculate.go new file mode 100644 index 0000000..1ce8670 --- /dev/null +++ b/calculate.go @@ -0,0 +1,17 @@ +package main + +import ( + "code.google.com/p/go.net/context" + "github.com/localhots/yeast/core" +) + +func main() { + chain, ok := core.NewChain("calculate") + if !ok { + println("Bad chain: calculate") + return + } + + ctx := context.Background() + core.ProcessChain(ctx, chain) +} diff --git a/config/chains.json b/config/chains.json new file mode 100644 index 0000000..abfb534 --- /dev/null +++ b/config/chains.json @@ -0,0 +1,22 @@ +{ + "calculate": { + "s": [ + "uuid", + "input_from_flag", + { + "p": [ + "power2", + "power3", + "power5" + ] + }, + "aggregator", + "logger", + { + "d": [ + "sleep" + ] + } + ] + } +} diff --git a/core/chain_processor.go b/core/chain_processor.go new file mode 100644 index 0000000..a9bd329 --- /dev/null +++ b/core/chain_processor.go @@ -0,0 +1,125 @@ +package core + +import ( + "encoding/json" + "io/ioutil" + "reflect" + "sync" + + "code.google.com/p/go.net/context" +) + +func NewChain(name string) (chain interface{}, ok bool) { + chains := readChains() + chain, ok = chains[name] + + return +} + +func ProcessChain(ctx context.Context, chain interface{}) context.Context { + ctx = processSequentialChain(ctx, []interface{}{chain}) + + return ctx +} + +func processSequentialChain(ctx context.Context, chain []interface{}) context.Context { + for _, link := range chain { + val := reflect.ValueOf(link) + + switch val.Kind() { + case reflect.Map: + lmap := link.(map[string]interface{}) + if subchain, ok := lmap["s"]; ok { + ctx = processSequentialChain(ctx, subchain.([]interface{})) + } else if subchain, ok := lmap["p"]; ok { + ctx = processParallelChain(ctx, subchain.([]interface{})) + } else if subchain, ok := lmap["d"]; ok { + go processSequentialChain(ctx, subchain.([]interface{})) + } + case reflect.String: + unitName := val.String() + unit, ok := Units[unitName] + if !ok { + panic("Unknown unit: " + unitName) + } + + // Execution of a unit + ctx = unit(ctx) + default: + panic("Unexpected chain element: " + val.Kind().String()) + } + } + + return ctx +} + +func processParallelChain(ctx context.Context, chain []interface{}) context.Context { + var ( + wg sync.WaitGroup + contexts = make(chan context.Context, len(chain)) + finished = make(chan struct{}) + ) + + for _, link := range chain { + val := reflect.ValueOf(link) + + switch val.Kind() { + case reflect.Map: + lmap := link.(map[string]interface{}) + if subchain, ok := lmap["s"]; ok { + wg.Add(1) + go func(ctx context.Context) { + contexts <- processSequentialChain(ctx, subchain.([]interface{})) + wg.Done() + }(ctx) + } else if subchain, ok := lmap["p"]; ok { + wg.Add(1) + go func(ctx context.Context) { + contexts <- processParallelChain(ctx, subchain.([]interface{})) + wg.Done() + }(ctx) + } else if subchain, ok := lmap["d"]; ok { + go processSequentialChain(ctx, subchain.([]interface{})) + } + case reflect.String: + unitName := val.String() + unit, ok := Units[unitName] + if !ok { + panic("Unknown unit: " + unitName) + } + + // Execution of a unit + wg.Add(1) + go func(ctx context.Context) { + contexts <- unit(ctx) + wg.Done() + }(ctx) + default: + panic("Unexpected chain element: " + val.Kind().String()) + } + } + + go func() { + wg.Wait() + finished <- struct{}{} + }() + + ctx = context.WithValue(ctx, "parallel_contexts", contexts) + ctx = context.WithValue(ctx, "parallel_finished", finished) + + return ctx +} + +func readChains() map[string]interface{} { + b, err := ioutil.ReadFile("config/chains.json") + if err != nil { + panic(err) + } + + var chains map[string]interface{} + if err := json.Unmarshal(b, &chains); err != nil { + panic(err) + } + + return chains +} diff --git a/core/units.go b/core/units.go new file mode 100644 index 0000000..1aa1ce4 --- /dev/null +++ b/core/units.go @@ -0,0 +1,28 @@ +package core + +import ( + "code.google.com/p/go.net/context" + "github.com/localhots/yeast/units/aggregator" + "github.com/localhots/yeast/units/input" + "github.com/localhots/yeast/units/logger" + "github.com/localhots/yeast/units/power" + "github.com/localhots/yeast/units/sleep" + "github.com/localhots/yeast/units/uuid" +) + +type ( + UnitsDict map[string]func(context.Context) context.Context +) + +var ( + Units = UnitsDict{ + "aggregator": aggregator.Call, + "logger": logger.Call, + "power2": power.Power2, + "power3": power.Power3, + "power5": power.Power5, + "input_from_flag": input.FromFlag, + "sleep": sleep.Call, + "uuid": uuid.Call, + } +) diff --git a/tools/parallel.go b/tools/parallel.go new file mode 100644 index 0000000..da147f6 --- /dev/null +++ b/tools/parallel.go @@ -0,0 +1,22 @@ +package tools + +import ( + "code.google.com/p/go.net/context" +) + +func SyncronizeParallelChain(ctx context.Context, fn func(context.Context)) { + var ( + pcontexts = ctx.Value("parallel_contexts").(chan context.Context) + pfinished = ctx.Value("parallel_finished").(chan struct{}) + working = true + ) + + for len(pcontexts) > 0 || working { + select { + case pctx := <-pcontexts: + fn(pctx) + case <-pfinished: + working = false + } + } +} diff --git a/units/aggregator/aggregator.go b/units/aggregator/aggregator.go new file mode 100644 index 0000000..6b7177f --- /dev/null +++ b/units/aggregator/aggregator.go @@ -0,0 +1,22 @@ +package aggregator + +import ( + "fmt" + + "code.google.com/p/go.net/context" + "github.com/localhots/yeast/tools" + "github.com/localhots/yeast/units/power" +) + +func Call(ctx context.Context) context.Context { + results := []string{} + + tools.SyncronizeParallelChain(ctx, func(ctx context.Context) { + r := ctx.Value("power_result").(power.PowerResult) + results = append(results, fmt.Sprintf("%d ^ %d = %d", r.Num, r.Power, r.Result)) + }) + + ctx = context.WithValue(ctx, "power_results", results) + + return ctx +} diff --git a/units/input/cli.go b/units/input/cli.go new file mode 100644 index 0000000..2adb84e --- /dev/null +++ b/units/input/cli.go @@ -0,0 +1,17 @@ +package input + +import ( + "flag" + + "code.google.com/p/go.net/context" +) + +func FromFlag(ctx context.Context) context.Context { + var num int + flag.IntVar(&num, "num", 0, "Pass this number") + flag.Parse() + + ctx = context.WithValue(ctx, "num", num) + + return ctx +} diff --git a/units/logger/logger.go b/units/logger/logger.go new file mode 100644 index 0000000..cc03442 --- /dev/null +++ b/units/logger/logger.go @@ -0,0 +1,20 @@ +package logger + +import ( + "fmt" + + "code.google.com/p/go.net/context" +) + +func Call(ctx context.Context) context.Context { + results := ctx.Value("power_results").([]string) + id := ctx.Value("id").(string) + + fmt.Println("Calculation result", id) + fmt.Println("Power results are:") + for _, r := range results { + fmt.Println(r) + } + + return ctx +} diff --git a/units/power/power.go b/units/power/power.go new file mode 100644 index 0000000..6b7075c --- /dev/null +++ b/units/power/power.go @@ -0,0 +1,37 @@ +package power + +import ( + "code.google.com/p/go.net/context" +) + +type ( + PowerResult struct { + Num int + Power int + Result int + } +) + +func Power2(ctx context.Context) context.Context { + v := ctx.Value("num").(int) + res := PowerResult{Num: v, Power: 2, Result: v * v} + ctx = context.WithValue(ctx, "power_result", res) + + return ctx +} + +func Power3(ctx context.Context) context.Context { + v := ctx.Value("num").(int) + res := PowerResult{Num: v, Power: 3, Result: v * v * v} + ctx = context.WithValue(ctx, "power_result", res) + + return ctx +} + +func Power5(ctx context.Context) context.Context { + v := ctx.Value("num").(int) + res := PowerResult{Num: v, Power: 5, Result: v * v * v * v * v} + ctx = context.WithValue(ctx, "power_result", res) + + return ctx +} diff --git a/units/sleep/sleep.go b/units/sleep/sleep.go new file mode 100644 index 0000000..3a70f96 --- /dev/null +++ b/units/sleep/sleep.go @@ -0,0 +1,15 @@ +package sleep + +import ( + "fmt" + "time" + + "code.google.com/p/go.net/context" +) + +func Call(ctx context.Context) context.Context { + fmt.Println("Going into sleep") + time.Sleep(5 * time.Second) + fmt.Println("Woke up") + return ctx +} diff --git a/units/uuid/uuid.go b/units/uuid/uuid.go new file mode 100644 index 0000000..665d6b3 --- /dev/null +++ b/units/uuid/uuid.go @@ -0,0 +1,12 @@ +package uuid + +import ( + "code.google.com/p/go-uuid/uuid" + "code.google.com/p/go.net/context" +) + +func Call(ctx context.Context) context.Context { + ctx = context.WithValue(ctx, "id", uuid.New()) + + return ctx +}