1
0
Fork 0

Unit and chain banks

This commit is contained in:
Gregory Eremin 2015-02-11 19:27:51 +07:00
parent c96bc7ad8f
commit 9e472cc9a3
7 changed files with 117 additions and 89 deletions

8
app.go
View File

@ -17,8 +17,12 @@ func init() {
func main() {
core.InitConfig()
unit.LoadUnits(core.Conf().UnitsConfig)
chain.LoadChains(core.Conf().ChainsConfig)
ub := unit.NewBank(core.Conf().UnitsConfig)
ub.Reload()
cb := chain.NewBank(core.Conf().ChainsConfig, ub)
cb.Reload()
pretty.Println(core.Conf())

View File

@ -10,31 +10,49 @@ import (
"github.com/localhots/yeast/unit"
)
var (
chains = map[string]*Chain{}
type (
Bank struct {
config string
chains map[string]*Chain
units *unit.Bank
}
)
func LoadChains(path string) {
f, err := os.Open(path)
if err != nil {
panic("Failed to open chains config: " + path)
}
b, err := ioutil.ReadAll(f)
if err != nil {
panic("Failed to read chains config: " + path)
}
var schema map[string]interface{}
if err := json.Unmarshal(b, &schema); err != nil {
panic("Failed to parse chains config: " + path)
}
for name, c := range schema {
chains[name] = Parse(interface{}(c))
func NewBank(config string, units *unit.Bank) *Bank {
return &Bank{
config: config,
chains: map[string]*Chain{},
units: units,
}
}
func Parse(conf interface{}) *Chain {
func (b *Bank) Chain(name string) *Chain {
c, _ := b.chains[name]
return c
}
func (b *Bank) Reload() {
f, err := os.Open(b.config)
if err != nil {
panic("Failed to open chains config: " + b.config)
}
bs, err := ioutil.ReadAll(f)
if err != nil {
panic("Failed to read chains config: " + b.config)
}
var schema map[string]interface{}
if err := json.Unmarshal(bs, &schema); err != nil {
panic("Failed to parse chains config: " + b.config)
}
b.chains = map[string]*Chain{}
for name, c := range schema {
b.chains[name] = b.parse(interface{}(c))
}
}
func (b *Bank) parse(conf interface{}) *Chain {
c := &Chain{
Links: []unit.Caller{},
}
@ -51,13 +69,13 @@ func Parse(conf interface{}) *Chain {
switch val.Kind() {
case reflect.Map:
subchain := Parse(interface{}(link))
subchain := b.parse(interface{}(link))
if len(subchain.Links) > 0 {
c.Links = append(c.Links, unit.Caller(subchain))
}
case reflect.String:
name := link.(string)
if caller := unit.New(name); caller != nil {
if caller := b.units.Unit(name); caller != nil {
c.Links = append(c.Links, caller)
} else {
fmt.Println("Unknown unit:", name)

View File

@ -18,11 +18,6 @@ const (
LF = byte(10)
)
func New(name string) *Chain {
c, _ := chains[name]
return c
}
func (c *Chain) Call(data []byte) (resp []byte, err error) {
switch c.Flow {
case SequentialFlow:

View File

@ -5,8 +5,6 @@ import (
"os"
"os/exec"
"time"
"github.com/localhots/yeast/unit"
)
type (
@ -14,8 +12,8 @@ type (
)
// XXX: We're about to spawn hundreds of Python processes
func (s *Supervisor) StartAll() {
for _, name := range unit.Units() {
func (s *Supervisor) StartAll(units []string) {
for _, name := range units {
s.Start(name)
time.Sleep(500 * time.Millisecond) // Don't spawn processes too fast
}

69
unit/bank.go Normal file
View File

@ -0,0 +1,69 @@
package unit
import (
"encoding/json"
"io/ioutil"
"os"
"github.com/localhots/yeast/impl"
)
type (
Bank struct {
config string
units map[string]*Unit
}
)
func NewBank(config string) *Bank {
return &Bank{
config: config,
units: map[string]*Unit{},
}
}
func (b *Bank) Unit(name string) Caller {
if u, ok := b.units[name]; ok {
// Check for unit implementation and create a unit if there is none
if imp := impl.New(u.Impl); imp != nil {
return imp
} else {
return u
}
} else {
return nil
}
}
func (b *Bank) Reload() {
f, err := os.Open(b.config)
if err != nil {
panic("Failed to open units config: " + b.config)
}
bs, err := ioutil.ReadAll(f)
if err != nil {
panic("Failed to read units config: " + b.config)
}
var conf map[string]map[string]interface{}
if err := json.Unmarshal(bs, &conf); err != nil {
panic("Failed to parse units config: " + b.config)
}
b.units = map[string]*Unit{}
for name, meta := range conf {
b.units[name] = &Unit{
Name: name,
Impl: meta["impl"].(string),
Config: meta["config"],
}
}
}
func (b *Bank) Units() []string {
list := []string{}
for name, _ := range b.units {
list = append(list, name)
}
return list
}

View File

@ -1,41 +0,0 @@
package unit
import (
"encoding/json"
"io/ioutil"
"os"
)
var (
units = map[string]*Unit{}
)
func LoadUnits(path string) {
f, err := os.Open(path)
if err != nil {
panic("Failed to open units config: " + path)
}
b, err := ioutil.ReadAll(f)
if err != nil {
panic("Failed to parse units config: " + path)
}
var conf map[string]map[string]interface{}
json.Unmarshal(b, &conf)
for name, meta := range conf {
units[name] = &Unit{
Name: name,
Impl: meta["impl"].(string),
Config: meta["config"],
}
}
}
func Units() []string {
list := []string{}
for name, _ := range units {
list = append(list, name)
}
return list
}

View File

@ -5,8 +5,6 @@ import (
"fmt"
"net"
"strings"
"github.com/localhots/yeast/impl"
)
type (
@ -21,19 +19,6 @@ type (
}
)
func New(name string) Caller {
if u, ok := units[name]; ok {
// Check for unit implementation and create a unit if there is none
if imp := impl.New(u.Impl); imp != nil {
return imp
} else {
return u
}
} else {
return nil
}
}
func (u *Unit) Call(data []byte) (resp []byte, err error) {
var (
addr = &net.UnixAddr{u.socketPath(), "unix"}