Chain is a package

This commit is contained in:
2015-02-11 16:03:28 +07:00
parent ed17fbb9ef
commit 18ca775e02
2 changed files with 30 additions and 25 deletions
-101
View File
@@ -1,101 +0,0 @@
package core
import (
"bytes"
"sync"
)
type (
Chain struct {
Flow Flow
Links []Caller
}
Caller interface {
Call([]byte) ([]byte, error)
Units() []string
}
)
const (
LF = byte(10)
)
func (c *Chain) Call(data []byte) (resp []byte, err error) {
switch c.Flow {
case SequentialFlow:
return c.processSequentially(data)
case ParallelFlow:
return c.processInParallel(data)
case DelayedFlow:
return c.processDelayed(data)
default:
panic("Unreachable")
}
}
func (c *Chain) Units() []string {
// Collecting unique unit names using map
units := map[string]*struct{}{}
for _, caller := range c.Links {
for _, unit := range caller.Units() {
units[unit] = nil
}
}
// Extracting names to a slice
uniq := []string{}
for unit, _ := range units {
uniq = append(uniq, unit)
}
return uniq
}
func (c *Chain) processSequentially(data []byte) (resp []byte, err error) {
for _, caller := range c.Links {
if resp, err = caller.Call(data); err != nil {
return
}
}
return
}
func (c *Chain) processInParallel(data []byte) (resp []byte, err error) {
var (
inbox = make(chan []byte) // This channel must be unbuffered
buf bytes.Buffer
wg sync.WaitGroup
)
for _, caller := range c.Links {
wg.Add(1)
go func() {
if res, err := caller.Call(data); err == nil {
inbox <- res
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(inbox)
}()
for {
if res, ok := <-inbox; ok {
buf.Write(res)
buf.WriteByte(LF) // Add linebreak
} else {
break
}
}
return buf.Bytes(), nil
}
func (c *Chain) processDelayed(data []byte) (resp []byte, err error) {
for _, caller := range c.Links {
go caller.Call(data)
}
return data, nil
}
-70
View File
@@ -1,70 +0,0 @@
package core
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"reflect"
)
func ParseChains() (map[string]*Chain, error) {
f, err := os.Open(Conf().ChainsConfig)
if err != nil {
panic("Failed to open chains config: " + Conf().ChainsConfig)
}
b, err := ioutil.ReadAll(f)
if err != nil {
panic("Failed to parse chains config: " + Conf().ChainsConfig)
}
var schema map[string]interface{}
if err := json.Unmarshal(b, &schema); err != nil {
return nil, err
}
chains := map[string]*Chain{}
for name, chain := range schema {
chains[name] = buildChain(interface{}(chain))
}
return chains, nil
}
func buildChain(conf interface{}) *Chain {
c := &Chain{
Links: []Caller{},
}
for f, links := range conf.(map[string]interface{}) {
if flow := FlowOf(f); flow != UnknownFlow {
c.Flow = flow
} else {
panic("Unknown chain flow: " + f)
}
for _, link := range links.([]interface{}) {
val := reflect.ValueOf(link)
switch val.Kind() {
case reflect.Map:
subchain := buildChain(interface{}(link))
if len(subchain.Links) > 0 {
c.Links = append(c.Links, Caller(subchain))
}
case reflect.String:
name := link.(string)
caller, ok := Units[name]
if !ok {
fmt.Println("Unknown unit:", name)
} else {
c.Links = append(c.Links, caller)
}
default:
panic("Unexpected chain element: " + val.Kind().String())
}
}
}
return c
}