Wait for Kafka partition to shutdown

This commit is contained in:
Gregory Eremin 2015-10-18 03:45:25 +03:00
parent ef17093207
commit 787156d3ba
3 changed files with 14 additions and 5 deletions

View File

@ -19,7 +19,7 @@ func (n *NumberPrinter) Startup() {
log.Printf("Oh, crap! There was a panic, take a look: %v", err) log.Printf("Oh, crap! There was a panic, take a look: %v", err)
}) })
n.LimitRate(1, 2*time.Second) n.LimitRate(3, 1*time.Second)
n.SystemProcess("Random Number Generator", n.generateNumbers) n.SystemProcess("Random Number Generator", n.generateNumbers)
} }

View File

@ -19,10 +19,10 @@ type PriceUpdate struct {
// Startup creates a new subscription for ProductPriceUpdates topic. // Startup creates a new subscription for ProductPriceUpdates topic.
func (p *PriceConsumer) Startup() { func (p *PriceConsumer) Startup() {
p.LimitRate(1, 500*time.Millisecond)
b.Subscribe("ProductPriceUpdates", func(u PriceUpdate) { b.Subscribe("ProductPriceUpdates", func(u PriceUpdate) {
log.Printf("Price for %q is now $%.2f", u.Product, u.Amount) log.Printf("Price for %q is now $%.2f", u.Product, u.Amount)
}) })
p.LimitRate(5, 1*time.Second)
} }
// Shutdown is empty because PriceConsumer requires no cleanup upon exiting. // Shutdown is empty because PriceConsumer requires no cleanup upon exiting.

View File

@ -6,6 +6,7 @@ import (
"io/ioutil" "io/ioutil"
"log" "log"
"os" "os"
"sync"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/localhots/satan" "github.com/localhots/satan"
@ -21,6 +22,7 @@ type ConsumerState struct {
type Stream struct { type Stream struct {
messages chan []byte messages chan []byte
shutdown chan struct{} shutdown chan struct{}
wg sync.WaitGroup
} }
const ( const (
@ -80,15 +82,21 @@ func Subscribe(consumer, topic string) satan.Streamer {
shutdown: make(chan struct{}), shutdown: make(chan struct{}),
} }
go func() { go func() {
stream.wg.Add(1)
defer stream.wg.Done()
defer pc.Close()
for { for {
select { select {
case msg := <-pc.Messages(): case msg := <-pc.Messages():
stream.messages <- msg.Value select {
t.Offset = msg.Offset case stream.messages <- msg.Value:
t.Offset = msg.Offset
case <-stream.shutdown:
return
}
case err := <-pc.Errors(): case err := <-pc.Errors():
log.Println("Kafka error:", err.Error()) log.Println("Kafka error:", err.Error())
case <-stream.shutdown: case <-stream.shutdown:
pc.Close()
return return
} }
} }
@ -105,6 +113,7 @@ func (s *Stream) Messages() <-chan []byte {
// Close stops Kafka partition consumer. // Close stops Kafka partition consumer.
func (s *Stream) Close() { func (s *Stream) Close() {
close(s.shutdown) close(s.shutdown)
s.wg.Wait()
} }
func loadConsumerConfig() { func loadConsumerConfig() {