Added pattern matching
This commit is contained in:
		
							parent
							
								
									ef9874b618
								
							
						
					
					
						commit
						be5155b227
					
				
							
								
								
									
										72
									
								
								buffer.go
									
									
									
									
									
								
							
							
						
						
									
										72
									
								
								buffer.go
									
									
									
									
									
								
							| @ -1,6 +1,8 @@ | ||||
| package buffer | ||||
| 
 | ||||
| import ( | ||||
| 	"regexp" | ||||
| 
 | ||||
| 	influxdb "github.com/influxdb/influxdb/client" | ||||
| ) | ||||
| 
 | ||||
| @ -60,36 +62,41 @@ func (b *Buffer) Flush() { | ||||
| } | ||||
| 
 | ||||
| // Searches for points of given series that matches provided conditions | ||||
| func (b *Buffer) Lookup(series string, conds map[string]interface{}) (res *influxdb.Series) { | ||||
| 	s, ok := b.series[series] | ||||
| 	if !ok { | ||||
| // All resulting series MUST have the same set of columns in the same order | ||||
| func (b *Buffer) Lookup(pattern string, conds map[string]interface{}) (res map[string]*influxdb.Series, err error) { | ||||
| 	res = make(map[string]*influxdb.Series) | ||||
| 
 | ||||
| 	sers, err := b.matchSeries(pattern) | ||||
| 	if err != nil || len(sers) == 0 { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	// Building reversed column index | ||||
| 	colind := make(map[string]int) | ||||
| 	for i, name := range s.Columns { | ||||
| 	for i, name := range sers[0].Columns { | ||||
| 		colind[name] = i | ||||
| 	} | ||||
| 
 | ||||
| 	for _, row := range s.Points { | ||||
| 		good := true | ||||
| 		for key, val := range conds { | ||||
| 			ki, _ := colind[key] | ||||
| 			if row[ki] != val { | ||||
| 				good = false | ||||
| 			} | ||||
| 		} | ||||
| 		if good { | ||||
| 			// We need to return nil if there are no series/rows that matches condition | ||||
| 			if res == nil { | ||||
| 				res = &influxdb.Series{ | ||||
| 					Name:    s.Name, | ||||
| 					Columns: s.Columns, | ||||
| 					Points:  [][]interface{}{}, | ||||
| 	for _, s := range sers { | ||||
| 		for _, row := range s.Points { | ||||
| 			good := true | ||||
| 			for key, val := range conds { | ||||
| 				ki, _ := colind[key] | ||||
| 				if row[ki] != val { | ||||
| 					good = false | ||||
| 				} | ||||
| 			} | ||||
| 			res.Points = append(res.Points, row) | ||||
| 			if good { | ||||
| 				_, ok := res[s.Name] | ||||
| 				if !ok { | ||||
| 					res[s.Name] = &influxdb.Series{ | ||||
| 						Name:    s.Name, | ||||
| 						Columns: s.Columns, | ||||
| 						Points:  [][]interface{}{}, | ||||
| 					} | ||||
| 				} | ||||
| 				res[s.Name].Points = append(res[s.Name].Points, row) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| @ -125,3 +132,28 @@ func (b *Buffer) aggregate() { | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (b *Buffer) matchSeries(pattern string) (res []*influxdb.Series, err error) { | ||||
| 	if len(pattern) > 2 && pattern[:1] == "/" && pattern[len(pattern)-1:] == "/" { | ||||
| 		var reg *regexp.Regexp | ||||
| 
 | ||||
| 		reg, err = regexp.Compile(pattern[1 : len(pattern)-1]) | ||||
| 		if err != nil { | ||||
| 			return | ||||
| 		} | ||||
| 
 | ||||
| 		for name, s := range b.series { | ||||
| 			ok := reg.MatchString(name) | ||||
| 			if ok { | ||||
| 				res = append(res, s) | ||||
| 			} | ||||
| 		} | ||||
| 	} else { | ||||
| 		s, ok := b.series[pattern] | ||||
| 		if ok { | ||||
| 			res = append(res, s) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return | ||||
| } | ||||
|  | ||||
							
								
								
									
										103
									
								
								buffer_test.go
									
									
									
									
									
								
							
							
						
						
									
										103
									
								
								buffer_test.go
									
									
									
									
									
								
							| @ -55,42 +55,101 @@ func TestFlush(t *testing.T) { | ||||
| func TestLookup(t *testing.T) { | ||||
| 	fn := func(series []*influxdb.Series) {} | ||||
| 	b := NewBuffer(10, fn) | ||||
| 	b.Add(&influxdb.Series{ | ||||
| 		Name:    "foo", | ||||
| 		Columns: []string{"a", "b", "c"}, | ||||
| 		Points:  [][]interface{}{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, | ||||
| 	}) | ||||
| 	b.Add( | ||||
| 		&influxdb.Series{ | ||||
| 			Name:    "foo", | ||||
| 			Columns: []string{"a", "b", "c"}, | ||||
| 			Points:  [][]interface{}{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, | ||||
| 		}, | ||||
| 		&influxdb.Series{ | ||||
| 			Name:    "banana", | ||||
| 			Columns: []string{"a", "b", "c"}, | ||||
| 			Points:  [][]interface{}{{11, 22, 33}}, | ||||
| 		}, | ||||
| 	) | ||||
| 
 | ||||
| 	var res *influxdb.Series | ||||
| 	// Got to wait for aggragation coroutine to finish | ||||
| 	// XXX: Sleep is no good for testing. Need to come up with a better solution | ||||
| 	time.Sleep(10 * time.Millisecond) | ||||
| 
 | ||||
| 	var ( | ||||
| 		res map[string]*influxdb.Series | ||||
| 		err error | ||||
| 	) | ||||
| 
 | ||||
| 	// Should not match inexistent series | ||||
| 	res = b.Lookup("bar", map[string]interface{}{}) | ||||
| 	if res != nil { | ||||
| 		t.Error("Expected nil result, got non-nil") | ||||
| 	res, err = b.Lookup("bar", map[string]interface{}{}) | ||||
| 	if err != nil { | ||||
| 		t.Error("Unexpected error") | ||||
| 	} | ||||
| 	if len(res) > 0 { | ||||
| 		t.Errorf("Expected empty result, got %d series", len(res)) | ||||
| 	} | ||||
| 
 | ||||
| 	// Should not match existent series with false condition | ||||
| 	res = b.Lookup("bar", map[string]interface{}{"a": 2}) | ||||
| 	if res != nil { | ||||
| 		t.Error("Expected nil result, got non-nil") | ||||
| 	res, err = b.Lookup("bar", map[string]interface{}{"a": 2}) | ||||
| 	if err != nil { | ||||
| 		t.Error("Unexpected error") | ||||
| 	} | ||||
| 	if len(res) > 0 { | ||||
| 		t.Errorf("Expected empty result, got %d series", len(res)) | ||||
| 	} | ||||
| 
 | ||||
| 	// Should match series with empty condition | ||||
| 	res = b.Lookup("foo", map[string]interface{}{}) | ||||
| 	if res == nil { | ||||
| 		t.Error("Expected non-nil result, got nil") | ||||
| 	res, err = b.Lookup("foo", map[string]interface{}{}) | ||||
| 	if err != nil { | ||||
| 		t.Error("Unexpected error") | ||||
| 	} | ||||
| 	if len(res.Points) != 3 { | ||||
| 		t.Errorf("Expected 3 resulting rows, got %d", len(res.Points)) | ||||
| 	if len(res) != 1 { | ||||
| 		t.Errorf("Expected result with 1 series, got %d", len(res)) | ||||
| 	} | ||||
| 	if len(res["foo"].Points) != 3 { | ||||
| 		t.Errorf("Expected 3 points, got %d", len(res["foo"].Points)) | ||||
| 	} | ||||
| 
 | ||||
| 	// Should match series with true condition | ||||
| 	res = b.Lookup("foo", map[string]interface{}{"a": 1}) | ||||
| 	if res == nil { | ||||
| 		t.Error("Expected non-nil result, got nil") | ||||
| 	res, err = b.Lookup("foo", map[string]interface{}{"a": 1}) | ||||
| 	if err != nil { | ||||
| 		t.Error("Unexpected error") | ||||
| 	} | ||||
| 	if len(res.Points) != 1 { | ||||
| 		t.Errorf("Expected 1 resulting rows, got %d", len(res.Points)) | ||||
| 	if len(res) != 1 { | ||||
| 		t.Errorf("Expected result with 1 series, got %d", len(res)) | ||||
| 	} | ||||
| 	if len(res["foo"].Points) != 1 { | ||||
| 		t.Errorf("Expected 1 point, got %d", len(res["foo"].Points)) | ||||
| 	} | ||||
| 
 | ||||
| 	// Should match series with regexp | ||||
| 	res, err = b.Lookup("/\\/", map[string]interface{}{}) | ||||
| 	if err == nil { | ||||
| 		t.Error("Expected error, got nil") | ||||
| 	} | ||||
| 	if len(res) > 0 { | ||||
| 		t.Errorf("Expected empty result, got %d series", len(res)) | ||||
| 	} | ||||
| 
 | ||||
| 	res, err = b.Lookup("/oo$/", map[string]interface{}{}) | ||||
| 	if err != nil { | ||||
| 		t.Error("Unexpected error") | ||||
| 	} | ||||
| 	if len(res) != 1 { | ||||
| 		t.Errorf("Expected result with 1 series, got %d", len(res)) | ||||
| 	} | ||||
| 
 | ||||
| 	res, err = b.Lookup("/.*/", map[string]interface{}{}) | ||||
| 	if err != nil { | ||||
| 		t.Error("Unexpected error") | ||||
| 	} | ||||
| 	if len(res) != 2 { | ||||
| 		t.Errorf("Expected result with 2 series, got %d", len(res)) | ||||
| 	} | ||||
| 
 | ||||
| 	res, err = b.Lookup("/nothing/", map[string]interface{}{}) | ||||
| 	if err != nil { | ||||
| 		t.Error("Unexpected error") | ||||
| 	} | ||||
| 	if len(res) != 0 { | ||||
| 		t.Errorf("Expected empty result, got %d series", len(res)) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user