diff --git a/reader/enhanced_reader.go b/reader/enhanced_reader.go index 9127073..ece367f 100644 --- a/reader/enhanced_reader.go +++ b/reader/enhanced_reader.go @@ -4,26 +4,28 @@ import ( "context" "database/sql" + _ "github.com/localhots/mysql" // MySQL driver + "github.com/juju/errors" "github.com/localhots/bocadillo/binlog" "github.com/localhots/bocadillo/mysql" "github.com/localhots/bocadillo/reader/schema" "github.com/localhots/bocadillo/reader/slave" - _ "github.com/localhots/mysql" // MySQL driver ) // EnhancedReader is an extended version of the reader that maintains schema // details to add column names and signed integers support. type EnhancedReader struct { reader *Reader + safepoint binlog.Position schemaMgr *schema.Manager } // EnhancedRowsEvent ... type EnhancedRowsEvent struct { - Type binlog.EventType - Table binlog.TableDescription - Rows []map[string]interface{} + Header binlog.EventHeader + Table binlog.TableDescription + Rows []map[string]interface{} } // NewEnhanced creates a new enhanced binary log reader. @@ -41,6 +43,7 @@ func NewEnhanced(dsn string, sc slave.Config) (*EnhancedReader, error) { return &EnhancedReader{ reader: r, schemaMgr: schema.NewManager(conn), + safepoint: r.state, }, nil } @@ -121,9 +124,9 @@ func (r *EnhancedReader) nextRowsEvent() (*EnhancedRowsEvent, error) { } ere := EnhancedRowsEvent{ - Type: re.Type, - Table: *evt.Table, - Rows: make([]map[string]interface{}, len(re.Rows)), + Header: evt.Header, + Table: *evt.Table, + Rows: make([]map[string]interface{}, len(re.Rows)), } for i, row := range re.Rows { erow := make(map[string]interface{}, len(row)) @@ -145,11 +148,26 @@ func (r *EnhancedReader) nextRowsEvent() (*EnhancedRowsEvent, error) { } } +func (r *EnhancedReader) processEvent(evt Event) { + switch evt.Header.Type { + case binlog.EventTypeFormatDescription, binlog.EventTypeTableMap, binlog.EventTypeXID: + r.safepoint.Offset = evt.Offset + case binlog.EventTypeRotate: + r.safepoint = r.reader.state + } +} + // State returns current position in the binary log. func (r *EnhancedReader) State() binlog.Position { return r.reader.state } +// Safepoint returns last encountered position that is considered safe to start +// with. +func (r *EnhancedReader) Safepoint() binlog.Position { + return r.safepoint +} + // Close underlying database connection. func (r *EnhancedReader) Close() error { return r.reader.Close() diff --git a/reader/reader.go b/reader/reader.go index 57b6369..1fc05b8 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -19,11 +19,18 @@ type Event struct { Format binlog.FormatDescription Header binlog.EventHeader Buffer []byte + Offset uint64 // Table is not empty for rows events Table *binlog.TableDescription } +var ( + // ErrUnknownTableID is returned when a table ID from a rows event is + // missing in the table map index. + ErrUnknownTableID = errors.New("Unknown table ID") +) + // New creates a new binary log reader. func New(dsn string, sc slave.Config) (*Reader, error) { conn, err := slave.Connect(dsn, sc) @@ -112,7 +119,7 @@ func (r *Reader) ReadEvent() (*Event, error) { tableID, flags := re.PeekTableIDAndFlags(evt.Buffer, r.format) td, ok := r.tableMap[tableID] if !ok { - return nil, errors.New("Unknown table ID") + return nil, ErrUnknownTableID } evt.Table = &td