1
0
Fork 0

Initial commit

This commit is contained in:
Gregory Eremin 2014-08-06 15:19:36 +07:00
commit 16deef7fe6
No known key found for this signature in database
GPG Key ID: 5EFA427EEC26E86C
3 changed files with 265 additions and 0 deletions

51
README.md Normal file
View File

@ -0,0 +1,51 @@
# InfluxDB aggregated buffer
In order to achieve maximum writing performance to InfluxDB an application needs to write series with as many points per series, as many series per batch as possible.
This buffer does exactly that: it aggregates multiple points into single series and multiple series into single batch. When buffer gets full it flushes series into database.
## Example:
```go
package main
import (
"net/http"
influxdb "github.com/influxdb/influxdb/client"
"github.com/localhots/influxdb_buffer"
)
func handleMetrics(rw http.ResponseWriter, req *http.Request) {
ua := parseUserAgent(req.UserAgent())
geo := fetchGeoInfo(req.RemoteAddr)
buf.Add(
&influxdb.Series{
Name: "app.browsers",
Columns: []string{"vendor", "version", "mobile"},
Points: [][]interface{}{{ua.Vendor, ua.Version, ua.IsMobile}},
},
&influxdb.Series{
Name: "app.visitors.geo",
Columns: []string{"ip", "country", "city", "isp"},
Points: [][]interface{}{{ip, geo.Country, geo.City, geo.Isp}},
},
)
}
func writeMetrics(series []*influxdb.Series) {
influxdb.WriteSeriesWithTimePrecision(series, "s")
}
var (
buf *buffer.Buffer
)
func main() {
buf = buffer.New(100, writeMetrics)
http.HandleFunc("/metrics", handleMetrics)
http.ListenAndServe(":8080", nil)
}
```

90
buffer.go Normal file
View File

@ -0,0 +1,90 @@
package buffer
import (
influxdb "github.com/influxdb/influxdb/client"
)
type (
Buffer struct {
fn func(series []*influxdb.Series)
in chan *influxdb.Series
series map[string]*influxdb.Series
size int
capacity int
}
)
// `New` is an alias for a `NewBuffer` function
func New(capacity int, fn func(series []*influxdb.Series)) *Buffer {
return NewBuffer(capacity, fn)
}
// Creates a new buffer with given capacity and flushing function
// It also starts aggregation function in a coroutine
func NewBuffer(capacity int, fn func(series []*influxdb.Series)) *Buffer {
b := &Buffer{
fn: fn,
in: make(chan *influxdb.Series),
series: make(map[string]*influxdb.Series),
capacity: capacity,
}
go b.aggregate()
return b
}
// Returns buffer size
// 0 <= size <= capacity
func (b *Buffer) Size() int {
return b.size
}
// Adds one or multiple series to buffer
func (b *Buffer) Add(series ...*influxdb.Series) {
for _, item := range series {
b.in <- item
}
}
// Flushes aggregated series into database
func (b *Buffer) Flush() {
sbuffer := []*influxdb.Series{}
for _, item := range b.series {
sbuffer = append(sbuffer, item)
}
go b.fn(sbuffer)
b.series = make(map[string]*influxdb.Series)
b.size = 0
}
// Closes buffer income channel and flushes all remaining series into database
// It also terminates its aggregation coroutine
func (b *Buffer) Close() {
close(b.in)
b.Flush()
}
func (b *Buffer) aggregate() {
for {
select {
case item, open := <-b.in:
if !open {
return
}
_, ok := b.series[item.Name]
if ok {
b.series[item.Name].Points = append(b.series[item.Name].Points, item.Points...)
} else {
b.series[item.Name] = item
}
}
b.size += 1
if b.size == b.capacity {
b.Flush()
}
}
}

124
buffer_test.go Normal file
View File

@ -0,0 +1,124 @@
package buffer
import (
"testing"
"time"
influxdb "github.com/influxdb/influxdb/client"
)
func TestNewBuffer(t *testing.T) {
fn := func(series []*influxdb.Series) {}
b := NewBuffer(10, fn)
if b == nil {
t.Error("Failed to instantiate buffer with `NewBuffer` function")
}
}
func TestAdd(t *testing.T) {
fn := func(series []*influxdb.Series) {}
b := NewBuffer(10, fn)
if b.Size() != 0 {
t.Error("Freshly created buffer is not empty")
}
b.Add(&influxdb.Series{})
if b.Size() != 1 {
t.Error("Adding series to buffer does not increment size")
}
}
func TestFlush(t *testing.T) {
res := make(chan []*influxdb.Series, 1)
fn := func(series []*influxdb.Series) {
res <- series
}
b := NewBuffer(1, fn)
b.Add(&influxdb.Series{})
timer := time.NewTimer(time.Second)
select {
case <-res:
case <-timer.C:
t.Error("Flushing did not happen")
}
if b.Size() != 0 {
t.Error("Flushing buffer does not make it empty again")
}
}
func TestClose(t *testing.T) {
fn := func(series []*influxdb.Series) {}
b := NewBuffer(10, fn)
b.Add(&influxdb.Series{})
b.Close()
if b.Size() != 0 {
t.Error("Buffer was not flushed before closing")
}
defer func() {
if recover() == nil {
t.Error("No panic was caused by adding series to a closed buffer")
}
}()
b.Add(&influxdb.Series{})
}
func TestAggregate(t *testing.T) {
res := make(chan []*influxdb.Series, 1)
fn := func(series []*influxdb.Series) {
res <- series
}
b := NewBuffer(3, fn)
b.Add(
&influxdb.Series{
Name: "foo",
Columns: []string{"bar", "baz"},
Points: [][]interface{}{{1, 2}},
},
&influxdb.Series{
Name: "banana",
Columns: []string{"inevitable", "sadness"},
Points: [][]interface{}{{"every", "day"}},
},
&influxdb.Series{
Name: "foo",
Columns: []string{"bar", "baz"},
Points: [][]interface{}{{3, 4}},
},
)
timer := time.NewTimer(time.Second)
var series []*influxdb.Series
select {
case series = <-res:
case <-timer.C:
t.Error("Flushing did not happen")
}
if len(series) != 2 {
t.Errorf("Expected to recieve 2 aggregated series, not %d", len(series))
}
for _, ser := range series {
switch ser.Name {
case "foo":
if len(ser.Points) != 2 {
t.Errorf("Expected to recieve 2 aggregated points for series `foo`, not %d", len(ser.Points))
}
case "banana":
if len(ser.Points) != 1 {
t.Errorf("Expected to recieve 1 aggregated points for series `banana`, not %d", len(ser.Points))
}
default:
t.Errorf("Unexpected series name: %s", ser.Name)
}
}
}