231 lines
5.2 KiB
Go
231 lines
5.2 KiB
Go
package buffer
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
influxdb "github.com/influxdb/influxdb/client"
|
|
)
|
|
|
|
func TestNewBuffer(t *testing.T) {
|
|
fn := func(series []*influxdb.Series) {}
|
|
b := NewBuffer(10, fn)
|
|
defer b.Close()
|
|
|
|
if b == nil {
|
|
t.Error("Failed to instantiate buffer with `NewBuffer` function")
|
|
}
|
|
}
|
|
|
|
func TestAdd(t *testing.T) {
|
|
fn := func(series []*influxdb.Series) {}
|
|
b := NewBuffer(10, fn)
|
|
defer b.Close()
|
|
|
|
if b.Size() != 0 {
|
|
t.Error("Freshly created buffer is not empty")
|
|
}
|
|
|
|
b.Add(&influxdb.Series{})
|
|
|
|
if b.Size() != 1 {
|
|
t.Error("Adding series to buffer does not increment size")
|
|
}
|
|
}
|
|
|
|
func TestFlush(t *testing.T) {
|
|
fn := func(series []*influxdb.Series) {}
|
|
b := NewBuffer(1, fn)
|
|
defer b.Close()
|
|
b.Add(&influxdb.Series{})
|
|
|
|
if b.Size() != 0 {
|
|
t.Error("Flushing buffer does not make it empty again")
|
|
}
|
|
}
|
|
|
|
func TestClear(t *testing.T) {
|
|
fn := func(series []*influxdb.Series) {}
|
|
b := NewBuffer(10, fn)
|
|
defer b.Close()
|
|
b.Add(&influxdb.Series{})
|
|
|
|
if b.Size() != 1 {
|
|
t.Error("Expected buffer to contain 1 series before clearing, got %d", b.Size())
|
|
}
|
|
b.Clear()
|
|
if b.Size() != 0 {
|
|
t.Error("Expected buffer to be empty after clearing, got %d series", b.Size())
|
|
}
|
|
}
|
|
|
|
func TestLookup(t *testing.T) {
|
|
fn := func(series []*influxdb.Series) {}
|
|
b := NewBuffer(10, fn)
|
|
defer b.Close()
|
|
b.Add(
|
|
&influxdb.Series{
|
|
Name: "foo",
|
|
Columns: []string{"a", "b", "c"},
|
|
Points: [][]interface{}{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}},
|
|
},
|
|
&influxdb.Series{
|
|
Name: "banana",
|
|
Columns: []string{"a", "b", "c"},
|
|
Points: [][]interface{}{{11, 22, 33}},
|
|
},
|
|
)
|
|
|
|
// Got to wait for aggragation coroutine to finish
|
|
// XXX: Sleep is no good for testing. Need to come up with a better solution
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
var (
|
|
res map[string]*influxdb.Series
|
|
err error
|
|
)
|
|
|
|
// Should not match inexistent series
|
|
res, err = b.Lookup("bar", map[string]interface{}{})
|
|
if err != nil {
|
|
t.Error("Unexpected error")
|
|
}
|
|
if len(res) > 0 {
|
|
t.Errorf("Expected empty result, got %d series", len(res))
|
|
}
|
|
|
|
// Should not match existent series with false condition
|
|
res, err = b.Lookup("bar", map[string]interface{}{"a": 2})
|
|
if err != nil {
|
|
t.Error("Unexpected error")
|
|
}
|
|
if len(res) > 0 {
|
|
t.Errorf("Expected empty result, got %d series", len(res))
|
|
}
|
|
|
|
// Should match series with empty condition
|
|
res, err = b.Lookup("foo", map[string]interface{}{})
|
|
if err != nil {
|
|
t.Error("Unexpected error")
|
|
}
|
|
if len(res) != 1 {
|
|
t.Errorf("Expected result with 1 series, got %d", len(res))
|
|
}
|
|
if len(res["foo"].Points) != 3 {
|
|
t.Errorf("Expected 3 points, got %d", len(res["foo"].Points))
|
|
}
|
|
|
|
// Should match series with true condition
|
|
res, err = b.Lookup("foo", map[string]interface{}{"a": 1})
|
|
if err != nil {
|
|
t.Error("Unexpected error")
|
|
}
|
|
if len(res) != 1 {
|
|
t.Errorf("Expected result with 1 series, got %d", len(res))
|
|
}
|
|
if len(res["foo"].Points) != 1 {
|
|
t.Errorf("Expected 1 point, got %d", len(res["foo"].Points))
|
|
}
|
|
|
|
// Should match series with regexp
|
|
res, err = b.Lookup("/\\/", map[string]interface{}{})
|
|
if err == nil {
|
|
t.Error("Expected error, got nil")
|
|
}
|
|
if len(res) > 0 {
|
|
t.Errorf("Expected empty result, got %d series", len(res))
|
|
}
|
|
|
|
res, err = b.Lookup("/oo$/", map[string]interface{}{})
|
|
if err != nil {
|
|
t.Error("Unexpected error")
|
|
}
|
|
if len(res) != 1 {
|
|
t.Errorf("Expected result with 1 series, got %d", len(res))
|
|
}
|
|
|
|
res, err = b.Lookup("/.*/", map[string]interface{}{})
|
|
if err != nil {
|
|
t.Error("Unexpected error")
|
|
}
|
|
if len(res) != 2 {
|
|
t.Errorf("Expected result with 2 series, got %d", len(res))
|
|
}
|
|
|
|
res, err = b.Lookup("/nothing/", map[string]interface{}{})
|
|
if err != nil {
|
|
t.Error("Unexpected error")
|
|
}
|
|
if len(res) != 0 {
|
|
t.Errorf("Expected empty result, got %d series", len(res))
|
|
}
|
|
}
|
|
|
|
func TestClose(t *testing.T) {
|
|
fn := func(series []*influxdb.Series) {}
|
|
b := NewBuffer(10, fn)
|
|
b.Add(&influxdb.Series{})
|
|
|
|
b.Close()
|
|
|
|
if b.Size() != 0 {
|
|
t.Error("Buffer was not flushed before closing")
|
|
}
|
|
|
|
defer func() {
|
|
if recover() == nil {
|
|
t.Error("No panic was caused by adding series to a closed buffer")
|
|
}
|
|
}()
|
|
b.Add(&influxdb.Series{})
|
|
}
|
|
|
|
func TestAggregate(t *testing.T) {
|
|
res := make(chan []*influxdb.Series, 1)
|
|
fn := func(series []*influxdb.Series) { res <- series }
|
|
b := NewBuffer(3, fn)
|
|
defer b.Close()
|
|
b.Add(
|
|
&influxdb.Series{
|
|
Name: "foo",
|
|
Columns: []string{"bar", "baz"},
|
|
Points: [][]interface{}{{1, 2}},
|
|
},
|
|
&influxdb.Series{
|
|
Name: "banana",
|
|
Columns: []string{"inevitable", "sadness"},
|
|
Points: [][]interface{}{{"every", "day"}},
|
|
},
|
|
&influxdb.Series{
|
|
Name: "foo",
|
|
Columns: []string{"bar", "baz"},
|
|
Points: [][]interface{}{{3, 4}},
|
|
},
|
|
)
|
|
|
|
var series []*influxdb.Series
|
|
select {
|
|
case series = <-res:
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("Flushing did not happen")
|
|
}
|
|
|
|
if len(series) != 2 {
|
|
t.Errorf("Expected to recieve 2 aggregated series, not %d", len(series))
|
|
}
|
|
for _, ser := range series {
|
|
switch ser.Name {
|
|
case "foo":
|
|
if len(ser.Points) != 2 {
|
|
t.Errorf("Expected to recieve 2 aggregated points for series `foo`, not %d", len(ser.Points))
|
|
}
|
|
case "banana":
|
|
if len(ser.Points) != 1 {
|
|
t.Errorf("Expected to recieve 1 aggregated points for series `banana`, not %d", len(ser.Points))
|
|
}
|
|
default:
|
|
t.Errorf("Unexpected series name: %s", ser.Name)
|
|
}
|
|
}
|
|
}
|