From ef9874b6180fb32667505e08b04cd6ac784fe476 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Tue, 12 Aug 2014 14:20:33 +0700 Subject: [PATCH] Buffer lookup func --- buffer.go | 37 +++++++++++++++++++++++++++++++++++++ buffer_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/buffer.go b/buffer.go index 62ff214..06b2248 100644 --- a/buffer.go +++ b/buffer.go @@ -59,6 +59,43 @@ func (b *Buffer) Flush() { b.size = 0 } +// Searches for points of given series that matches provided conditions +func (b *Buffer) Lookup(series string, conds map[string]interface{}) (res *influxdb.Series) { + s, ok := b.series[series] + if !ok { + return + } + + // Building reversed column index + colind := make(map[string]int) + for i, name := range s.Columns { + colind[name] = i + } + + for _, row := range s.Points { + good := true + for key, val := range conds { + ki, _ := colind[key] + if row[ki] != val { + good = false + } + } + if good { + // We need to return nil if there are no series/rows that matches condition + if res == nil { + res = &influxdb.Series{ + Name: s.Name, + Columns: s.Columns, + Points: [][]interface{}{}, + } + } + res.Points = append(res.Points, row) + } + } + + return +} + // Closes buffer income channel and flushes all remaining series into database // It also terminates its aggregation coroutine func (b *Buffer) Close() { diff --git a/buffer_test.go b/buffer_test.go index 535806e..ad5cf44 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -52,6 +52,48 @@ func TestFlush(t *testing.T) { } } +func TestLookup(t *testing.T) { + fn := func(series []*influxdb.Series) {} + b := NewBuffer(10, fn) + b.Add(&influxdb.Series{ + Name: "foo", + Columns: []string{"a", "b", "c"}, + Points: [][]interface{}{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, + }) + + var res *influxdb.Series + + // Should not match inexistent series + res = b.Lookup("bar", map[string]interface{}{}) + if res != nil { + t.Error("Expected nil result, got non-nil") + } + + // Should not match existent series with false condition + res = b.Lookup("bar", map[string]interface{}{"a": 2}) + if res != nil { + t.Error("Expected nil result, got non-nil") + } + + // Should match series with empty condition + res = b.Lookup("foo", map[string]interface{}{}) + if res == nil { + t.Error("Expected non-nil result, got nil") + } + if len(res.Points) != 3 { + t.Errorf("Expected 3 resulting rows, got %d", len(res.Points)) + } + + // Should match series with true condition + res = b.Lookup("foo", map[string]interface{}{"a": 1}) + if res == nil { + t.Error("Expected non-nil result, got nil") + } + if len(res.Points) != 1 { + t.Errorf("Expected 1 resulting rows, got %d", len(res.Points)) + } +} + func TestClose(t *testing.T) { fn := func(series []*influxdb.Series) {} b := NewBuffer(10, fn)