Make more room for examples
This commit is contained in:
@@ -0,0 +1,3 @@
|
||||
# Shezmu example application
|
||||
|
||||
Describe the app here.
|
||||
@@ -0,0 +1,45 @@
|
||||
package daemons
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/localhots/shezmu"
|
||||
)
|
||||
|
||||
// NumberPrinter is a daemon that prints numbers once in a while.
|
||||
type NumberPrinter struct {
|
||||
shezmu.BaseDaemon
|
||||
}
|
||||
|
||||
// Startup sets up panic handler and starts enqueuing number printing jobs.
|
||||
func (n *NumberPrinter) Startup() {
|
||||
n.HandlePanics(func(err interface{}) {
|
||||
n.Logf("Oh, crap! There was a panic, take a look: %v", err)
|
||||
})
|
||||
|
||||
n.LimitRate(3, time.Second)
|
||||
n.SystemProcess("Random Number Generator", n.generateNumbers)
|
||||
}
|
||||
|
||||
func (n *NumberPrinter) generateNumbers() {
|
||||
for n.Continue() {
|
||||
if rand.Intn(10) == 0 {
|
||||
panic("Number generator refuses to work right now!")
|
||||
}
|
||||
// Generate a random number between 1000 and 9999 and print it
|
||||
num := 1000 + rand.Intn(9000)
|
||||
n.Process(n.makeActor(num))
|
||||
}
|
||||
}
|
||||
|
||||
func (n *NumberPrinter) makeActor(num int) shezmu.Actor {
|
||||
return func() {
|
||||
n.Log("Number printer says:", num)
|
||||
|
||||
// Making it crash sometimes
|
||||
if num%10 == 0 {
|
||||
panic("Nooooo! Random number generator returned a multiple of ten!")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package daemons
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/localhots/shezmu"
|
||||
)
|
||||
|
||||
// PriceConsumer consumes price update messages and prints them to the console.
|
||||
type PriceConsumer struct {
|
||||
shezmu.BaseDaemon
|
||||
}
|
||||
|
||||
// PriceUpdate describes a price update message.
|
||||
type PriceUpdate struct {
|
||||
Product string `json:"product"`
|
||||
Amount float64 `json:"amount"`
|
||||
}
|
||||
|
||||
// Startup creates a new subscription for ProductPriceUpdates topic.
|
||||
func (p *PriceConsumer) Startup() {
|
||||
p.Subscribe("ProductPriceUpdates", func(u PriceUpdate) {
|
||||
p.Logf("Price for %q is now $%.2f", u.Product, u.Amount)
|
||||
})
|
||||
p.LimitRate(5, time.Second)
|
||||
}
|
||||
@@ -0,0 +1,147 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/localhots/shezmu"
|
||||
)
|
||||
|
||||
// ConsumerState contains data that is required to create a Kafka consumer.
|
||||
type ConsumerState struct {
|
||||
Partition int32 `json:"partition"`
|
||||
Offset int64 `json:"offset"`
|
||||
}
|
||||
|
||||
// Subscriber is a dummy structure that implements shezmu.Subscriber interface.
|
||||
type Subscriber struct{}
|
||||
|
||||
// Stream is an implementation of shezmu.Stremer for Kafka messaging queue.
|
||||
type Stream struct {
|
||||
messages chan []byte
|
||||
shutdown chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
const (
|
||||
consumerStateFile = "tmp/consumers.json"
|
||||
)
|
||||
|
||||
var (
|
||||
kafkaClient sarama.Client
|
||||
kafkaConsumer sarama.Consumer
|
||||
consumers = map[string]map[string]ConsumerState{}
|
||||
)
|
||||
|
||||
// Initialize sets up the kafka package.
|
||||
func Initialize(brokers []string) {
|
||||
log.Println("Initializing Kafka")
|
||||
defer log.Println("Kafka is initialized")
|
||||
|
||||
conf := sarama.NewConfig()
|
||||
conf.ClientID = "Shezmu Example"
|
||||
|
||||
var err error
|
||||
if kafkaClient, err = sarama.NewClient(brokers, conf); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if kafkaConsumer, err = sarama.NewConsumerFromClient(kafkaClient); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
loadConsumerConfig()
|
||||
}
|
||||
|
||||
// Shutdown shuts down the kafka package.
|
||||
func Shutdown() {
|
||||
log.Println("Shutting down Kafka")
|
||||
defer log.Println("Kafka was shut down")
|
||||
|
||||
if err := kafkaConsumer.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := kafkaClient.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe creates a shezmu.Streamer implementation for Kafka messaging queue.
|
||||
func (s Subscriber) Subscribe(consumerName, topic string) shezmu.Streamer {
|
||||
c, ok := consumers[consumerName]
|
||||
if !ok {
|
||||
panic(fmt.Errorf("Consumer %q has no config", consumerName))
|
||||
}
|
||||
t, ok := c[topic]
|
||||
if !ok {
|
||||
panic(fmt.Errorf("Consumer %q has no config for topic %q", consumerName, topic))
|
||||
}
|
||||
|
||||
pc, err := kafkaConsumer.ConsumePartition(topic, t.Partition, t.Offset)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
stream := &Stream{
|
||||
messages: make(chan []byte),
|
||||
shutdown: make(chan struct{}),
|
||||
}
|
||||
go func() {
|
||||
stream.wg.Add(1)
|
||||
defer stream.wg.Done()
|
||||
defer pc.Close()
|
||||
for {
|
||||
select {
|
||||
case msg := <-pc.Messages():
|
||||
select {
|
||||
case stream.messages <- msg.Value:
|
||||
t.Offset = msg.Offset
|
||||
case <-stream.shutdown:
|
||||
return
|
||||
}
|
||||
case err := <-pc.Errors():
|
||||
log.Println("Kafka error:", err.Error())
|
||||
case <-stream.shutdown:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
// Messages returns a channel that stream messages.
|
||||
func (s *Stream) Messages() <-chan []byte {
|
||||
return s.messages
|
||||
}
|
||||
|
||||
// Close stops Kafka partition consumer.
|
||||
func (s *Stream) Close() {
|
||||
close(s.shutdown)
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
func loadConsumerConfig() {
|
||||
if b, err := ioutil.ReadFile(consumerStateFile); err != nil {
|
||||
fmt.Println(`Kafka consumers state file was not found at ` + consumerStateFile + `
|
||||
Please create one in order to proceed with this example.
|
||||
Config file contents should look like this:
|
||||
{
|
||||
"ConsumerName": {
|
||||
"TopicName": {
|
||||
"partition": 0,
|
||||
"offset": 12345
|
||||
}
|
||||
}
|
||||
}`)
|
||||
os.Exit(1)
|
||||
} else {
|
||||
if err = json.Unmarshal(b, &consumers); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/localhots/shezmu"
|
||||
"github.com/localhots/shezmu/example/daemons"
|
||||
"github.com/localhots/shezmu/example/kafka"
|
||||
"github.com/localhots/shezmu/server"
|
||||
"github.com/localhots/shezmu/stats"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var brokers string
|
||||
|
||||
flag.StringVar(&brokers, "brokers", "127.0.0.1:9092", "Kafka broker addresses separated by space")
|
||||
flag.Parse()
|
||||
|
||||
kafka.Initialize(strings.Split(brokers, " "))
|
||||
defer kafka.Shutdown()
|
||||
|
||||
statsLogger := stats.NewStdoutLogger(0)
|
||||
defer statsLogger.Print()
|
||||
|
||||
statsServer := stats.NewServer()
|
||||
server := server.New(6464, statsServer)
|
||||
server.Start()
|
||||
|
||||
s := shezmu.Summon()
|
||||
s.Subscriber = kafka.Subscriber{}
|
||||
s.DaemonStats = stats.NewGroup(statsLogger, statsServer)
|
||||
|
||||
s.AddDaemon(&daemons.NumberPrinter{})
|
||||
s.AddDaemon(&daemons.PriceConsumer{})
|
||||
|
||||
s.StartDaemons()
|
||||
defer s.StopDaemons()
|
||||
|
||||
sig := make(chan os.Signal)
|
||||
signal.Notify(sig, syscall.SIGINT, syscall.SIGHUP)
|
||||
switch <-sig {
|
||||
case syscall.SIGHUP:
|
||||
s.StopDaemons()
|
||||
s.StartDaemons()
|
||||
case syscall.SIGINT:
|
||||
return
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user