diff --git a/buffer.go b/buffer.go index 06b2248..e80d38c 100644 --- a/buffer.go +++ b/buffer.go @@ -1,6 +1,8 @@ package buffer import ( + "regexp" + influxdb "github.com/influxdb/influxdb/client" ) @@ -60,36 +62,41 @@ func (b *Buffer) Flush() { } // Searches for points of given series that matches provided conditions -func (b *Buffer) Lookup(series string, conds map[string]interface{}) (res *influxdb.Series) { - s, ok := b.series[series] - if !ok { +// All resulting series MUST have the same set of columns in the same order +func (b *Buffer) Lookup(pattern string, conds map[string]interface{}) (res map[string]*influxdb.Series, err error) { + res = make(map[string]*influxdb.Series) + + sers, err := b.matchSeries(pattern) + if err != nil || len(sers) == 0 { return } // Building reversed column index colind := make(map[string]int) - for i, name := range s.Columns { + for i, name := range sers[0].Columns { colind[name] = i } - for _, row := range s.Points { - good := true - for key, val := range conds { - ki, _ := colind[key] - if row[ki] != val { - good = false - } - } - if good { - // We need to return nil if there are no series/rows that matches condition - if res == nil { - res = &influxdb.Series{ - Name: s.Name, - Columns: s.Columns, - Points: [][]interface{}{}, + for _, s := range sers { + for _, row := range s.Points { + good := true + for key, val := range conds { + ki, _ := colind[key] + if row[ki] != val { + good = false } } - res.Points = append(res.Points, row) + if good { + _, ok := res[s.Name] + if !ok { + res[s.Name] = &influxdb.Series{ + Name: s.Name, + Columns: s.Columns, + Points: [][]interface{}{}, + } + } + res[s.Name].Points = append(res[s.Name].Points, row) + } } } @@ -125,3 +132,28 @@ func (b *Buffer) aggregate() { } } } + +func (b *Buffer) matchSeries(pattern string) (res []*influxdb.Series, err error) { + if len(pattern) > 2 && pattern[:1] == "/" && pattern[len(pattern)-1:] == "/" { + var reg *regexp.Regexp + + reg, err = regexp.Compile(pattern[1 : len(pattern)-1]) + if err != nil { + return + } + + for name, s := range b.series { + ok := reg.MatchString(name) + if ok { + res = append(res, s) + } + } + } else { + s, ok := b.series[pattern] + if ok { + res = append(res, s) + } + } + + return +} diff --git a/buffer_test.go b/buffer_test.go index ad5cf44..de5aa99 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -55,42 +55,101 @@ func TestFlush(t *testing.T) { func TestLookup(t *testing.T) { fn := func(series []*influxdb.Series) {} b := NewBuffer(10, fn) - b.Add(&influxdb.Series{ - Name: "foo", - Columns: []string{"a", "b", "c"}, - Points: [][]interface{}{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, - }) + b.Add( + &influxdb.Series{ + Name: "foo", + Columns: []string{"a", "b", "c"}, + Points: [][]interface{}{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, + }, + &influxdb.Series{ + Name: "banana", + Columns: []string{"a", "b", "c"}, + Points: [][]interface{}{{11, 22, 33}}, + }, + ) - var res *influxdb.Series + // Got to wait for aggragation coroutine to finish + // XXX: Sleep is no good for testing. Need to come up with a better solution + time.Sleep(10 * time.Millisecond) + + var ( + res map[string]*influxdb.Series + err error + ) // Should not match inexistent series - res = b.Lookup("bar", map[string]interface{}{}) - if res != nil { - t.Error("Expected nil result, got non-nil") + res, err = b.Lookup("bar", map[string]interface{}{}) + if err != nil { + t.Error("Unexpected error") + } + if len(res) > 0 { + t.Errorf("Expected empty result, got %d series", len(res)) } // Should not match existent series with false condition - res = b.Lookup("bar", map[string]interface{}{"a": 2}) - if res != nil { - t.Error("Expected nil result, got non-nil") + res, err = b.Lookup("bar", map[string]interface{}{"a": 2}) + if err != nil { + t.Error("Unexpected error") + } + if len(res) > 0 { + t.Errorf("Expected empty result, got %d series", len(res)) } // Should match series with empty condition - res = b.Lookup("foo", map[string]interface{}{}) - if res == nil { - t.Error("Expected non-nil result, got nil") + res, err = b.Lookup("foo", map[string]interface{}{}) + if err != nil { + t.Error("Unexpected error") } - if len(res.Points) != 3 { - t.Errorf("Expected 3 resulting rows, got %d", len(res.Points)) + if len(res) != 1 { + t.Errorf("Expected result with 1 series, got %d", len(res)) + } + if len(res["foo"].Points) != 3 { + t.Errorf("Expected 3 points, got %d", len(res["foo"].Points)) } // Should match series with true condition - res = b.Lookup("foo", map[string]interface{}{"a": 1}) - if res == nil { - t.Error("Expected non-nil result, got nil") + res, err = b.Lookup("foo", map[string]interface{}{"a": 1}) + if err != nil { + t.Error("Unexpected error") } - if len(res.Points) != 1 { - t.Errorf("Expected 1 resulting rows, got %d", len(res.Points)) + if len(res) != 1 { + t.Errorf("Expected result with 1 series, got %d", len(res)) + } + if len(res["foo"].Points) != 1 { + t.Errorf("Expected 1 point, got %d", len(res["foo"].Points)) + } + + // Should match series with regexp + res, err = b.Lookup("/\\/", map[string]interface{}{}) + if err == nil { + t.Error("Expected error, got nil") + } + if len(res) > 0 { + t.Errorf("Expected empty result, got %d series", len(res)) + } + + res, err = b.Lookup("/oo$/", map[string]interface{}{}) + if err != nil { + t.Error("Unexpected error") + } + if len(res) != 1 { + t.Errorf("Expected result with 1 series, got %d", len(res)) + } + + res, err = b.Lookup("/.*/", map[string]interface{}{}) + if err != nil { + t.Error("Unexpected error") + } + if len(res) != 2 { + t.Errorf("Expected result with 2 series, got %d", len(res)) + } + + res, err = b.Lookup("/nothing/", map[string]interface{}{}) + if err != nil { + t.Error("Unexpected error") + } + if len(res) != 0 { + t.Errorf("Expected empty result, got %d series", len(res)) } }