From 7e08b8e8307626fef9f3bfe2c14238117ba5217b Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Mon, 12 Nov 2018 15:13:28 +0100 Subject: [PATCH] New enhanced reader with column names and integer signing --- reader/enhanced_reader.go | 174 ++++++++++++++++++++++++++++++++++++++ reader/reader.go | 2 - 2 files changed, 174 insertions(+), 2 deletions(-) create mode 100644 reader/enhanced_reader.go diff --git a/reader/enhanced_reader.go b/reader/enhanced_reader.go new file mode 100644 index 0000000..9127073 --- /dev/null +++ b/reader/enhanced_reader.go @@ -0,0 +1,174 @@ +package reader + +import ( + "context" + "database/sql" + + "github.com/juju/errors" + "github.com/localhots/bocadillo/binlog" + "github.com/localhots/bocadillo/mysql" + "github.com/localhots/bocadillo/reader/schema" + "github.com/localhots/bocadillo/reader/slave" + _ "github.com/localhots/mysql" // MySQL driver +) + +// EnhancedReader is an extended version of the reader that maintains schema +// details to add column names and signed integers support. +type EnhancedReader struct { + reader *Reader + schemaMgr *schema.Manager +} + +// EnhancedRowsEvent ... +type EnhancedRowsEvent struct { + Type binlog.EventType + Table binlog.TableDescription + Rows []map[string]interface{} +} + +// NewEnhanced creates a new enhanced binary log reader. +func NewEnhanced(dsn string, sc slave.Config) (*EnhancedReader, error) { + r, err := New(dsn, sc) + if err != nil { + return nil, err + } + + conn, err := sql.Open("mysql", dsn) + if err != nil { + return nil, err + } + + return &EnhancedReader{ + reader: r, + schemaMgr: schema.NewManager(conn), + }, nil +} + +// WhitelistTables adds given tables of the given database to processing white +// list. +func (r *EnhancedReader) WhitelistTables(database string, tables ...string) error { + for _, tbl := range tables { + if err := r.schemaMgr.Manage(database, tbl); err != nil { + return err + } + } + return nil +} + +// ReadEvent reads next event from the binary log. +func (r *EnhancedReader) ReadEvent() (*Event, error) { + evt, err := r.reader.ReadEvent() + if err != nil { + return nil, err + } + + switch evt.Header.Type { + case binlog.EventTypeQuery: + var qe binlog.QueryEvent + qe.Decode(evt.Buffer) + err = r.schemaMgr.ProcessQuery(string(qe.Schema), string(qe.Query)) + } + + return evt, err +} + +// NextRowsEvent returns the next rows event for a whitelisted table. It blocks +// until next event is received or context is cancelled. +func (r *EnhancedReader) NextRowsEvent(ctx context.Context) (*EnhancedRowsEvent, error) { + evtch := make(chan *EnhancedRowsEvent) + errch := make(chan error) + go func() { + evt, err := r.nextRowsEvent() + if err != nil { + errch <- err + } else { + evtch <- evt + } + }() + + select { + case evt := <-evtch: + return evt, nil + case err := <-errch: + return nil, err + case <-ctx.Done(): + return nil, nil + } +} + +func (r *EnhancedReader) nextRowsEvent() (*EnhancedRowsEvent, error) { + for { + evt, err := r.reader.ReadEvent() + if err != nil { + return nil, err + } + + // Check if it's a rows event + if binlog.RowsEventVersion(evt.Header.Type) < 0 { + // fmt.Println("Not a rows event", evt.Header.Type.String()) + continue + } + + tbl := r.schemaMgr.Schema.Table(evt.Table.SchemaName, evt.Table.TableName) + if tbl == nil { + // Not whitelisted + continue + } + + re, err := evt.DecodeRows() + if err != nil { + return nil, err + } + + ere := EnhancedRowsEvent{ + Type: re.Type, + Table: *evt.Table, + Rows: make([]map[string]interface{}, len(re.Rows)), + } + for i, row := range re.Rows { + erow := make(map[string]interface{}, len(row)) + for j, val := range row { + col := tbl.Column(j) + if col == nil { + return nil, errors.New("column index undefined") + } + ct := mysql.ColumnType(evt.Table.ColumnTypes[j]) + if !col.Unsigned { + val = signNumber(val, ct) + } + erow[col.Name] = val + } + ere.Rows[i] = erow + } + + return &ere, nil + } +} + +// State returns current position in the binary log. +func (r *EnhancedReader) State() binlog.Position { + return r.reader.state +} + +// Close underlying database connection. +func (r *EnhancedReader) Close() error { + return r.reader.Close() +} + +func signNumber(val interface{}, ct mysql.ColumnType) interface{} { + switch tval := val.(type) { + case uint8: + return mysql.SignUint8(tval) + case uint16: + return mysql.SignUint16(tval) + case uint32: + if ct == mysql.ColumnTypeInt24 { + return mysql.SignUint24(tval) + } + return mysql.SignUint32(tval) + case uint64: + return mysql.SignUint64(tval) + default: + return val + } +} diff --git a/reader/reader.go b/reader/reader.go index f939019..57b6369 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -3,7 +3,6 @@ package reader import ( "github.com/juju/errors" "github.com/localhots/bocadillo/binlog" - "github.com/localhots/bocadillo/reader/schema" "github.com/localhots/bocadillo/reader/slave" ) @@ -13,7 +12,6 @@ type Reader struct { state binlog.Position format binlog.FormatDescription tableMap map[uint64]binlog.TableDescription - schema *schema.Schema } // Event contains binlog event details.