171 lines
4.0 KiB
Go
171 lines
4.0 KiB
Go
package reader
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
|
|
_ "github.com/go-sql-driver/mysql" // MySQL driver
|
|
|
|
"github.com/juju/errors"
|
|
"github.com/localhots/bocadillo/binlog"
|
|
"github.com/localhots/bocadillo/mysql"
|
|
"github.com/localhots/bocadillo/mysql/driver"
|
|
"github.com/localhots/bocadillo/reader/schema"
|
|
)
|
|
|
|
// EnhancedReader is an extended version of the reader that maintains schema
|
|
// details to add column names and signed integers support.
|
|
type EnhancedReader struct {
|
|
reader *Reader
|
|
safepoint binlog.Position
|
|
schemaMgr *schema.Manager
|
|
}
|
|
|
|
// EnhancedRowsEvent ...
|
|
type EnhancedRowsEvent struct {
|
|
Header binlog.EventHeader
|
|
Table binlog.TableDescription
|
|
Rows []map[string]interface{}
|
|
}
|
|
|
|
// NewEnhanced creates a new enhanced binary log reader.
|
|
func NewEnhanced(dsn string, sc driver.Config) (*EnhancedReader, error) {
|
|
r, err := New(dsn, sc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
conn, err := sql.Open("mysql", dsn)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &EnhancedReader{
|
|
reader: r,
|
|
schemaMgr: schema.NewManager(conn),
|
|
safepoint: r.state,
|
|
}, nil
|
|
}
|
|
|
|
// WhitelistTables adds given tables of the given database to processing white
|
|
// list.
|
|
func (r *EnhancedReader) WhitelistTables(database string, tables ...string) error {
|
|
for _, tbl := range tables {
|
|
if err := r.schemaMgr.Manage(database, tbl); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReadEvent reads next event from the binary log.
|
|
func (r *EnhancedReader) ReadEvent(ctx context.Context) (*Event, error) {
|
|
evt, err := r.reader.ReadEvent(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch evt.Header.Type {
|
|
case binlog.EventTypeQuery:
|
|
var qe binlog.QueryEvent
|
|
qe.Decode(evt.Buffer)
|
|
err = r.schemaMgr.ProcessQuery(string(qe.Schema), string(qe.Query))
|
|
}
|
|
|
|
return evt, err
|
|
}
|
|
|
|
// NextRowsEvent returns the next rows event for a whitelisted table. It blocks
|
|
// until next event is received or context is cancelled.
|
|
func (r *EnhancedReader) NextRowsEvent(ctx context.Context) (*EnhancedRowsEvent, error) {
|
|
for {
|
|
evt, err := r.reader.ReadEvent(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check if it's a rows event
|
|
if binlog.RowsEventVersion(evt.Header.Type) < 0 {
|
|
// fmt.Println("Not a rows event", evt.Header.Type.String())
|
|
continue
|
|
}
|
|
|
|
tbl := r.schemaMgr.Schema.Table(evt.Table.SchemaName, evt.Table.TableName)
|
|
if tbl == nil {
|
|
// Not whitelisted
|
|
continue
|
|
}
|
|
|
|
re, err := evt.DecodeRows()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ere := EnhancedRowsEvent{
|
|
Header: evt.Header,
|
|
Table: *evt.Table,
|
|
Rows: make([]map[string]interface{}, len(re.Rows)),
|
|
}
|
|
for i, row := range re.Rows {
|
|
erow := make(map[string]interface{}, len(row))
|
|
for j, val := range row {
|
|
col := tbl.Column(j)
|
|
if col == nil {
|
|
return nil, errors.New("column index undefined")
|
|
}
|
|
ct := mysql.ColumnType(evt.Table.ColumnTypes[j])
|
|
if !col.Unsigned {
|
|
val = signNumber(val, ct)
|
|
}
|
|
erow[col.Name] = val
|
|
}
|
|
ere.Rows[i] = erow
|
|
}
|
|
|
|
return &ere, nil
|
|
}
|
|
}
|
|
|
|
func (r *EnhancedReader) processEvent(evt Event) {
|
|
switch evt.Header.Type {
|
|
case binlog.EventTypeFormatDescription, binlog.EventTypeTableMap, binlog.EventTypeXID:
|
|
r.safepoint.Offset = evt.Offset
|
|
case binlog.EventTypeRotate:
|
|
r.safepoint = r.reader.state
|
|
}
|
|
}
|
|
|
|
// State returns current position in the binary log.
|
|
func (r *EnhancedReader) State() binlog.Position {
|
|
return r.reader.state
|
|
}
|
|
|
|
// Safepoint returns last encountered position that is considered safe to start
|
|
// with.
|
|
func (r *EnhancedReader) Safepoint() binlog.Position {
|
|
return r.safepoint
|
|
}
|
|
|
|
// Close underlying database connection.
|
|
func (r *EnhancedReader) Close() error {
|
|
return r.reader.Close()
|
|
}
|
|
|
|
func signNumber(val interface{}, ct mysql.ColumnType) interface{} {
|
|
switch tval := val.(type) {
|
|
case uint8:
|
|
return mysql.SignUint8(tval)
|
|
case uint16:
|
|
return mysql.SignUint16(tval)
|
|
case uint32:
|
|
if ct == mysql.ColumnTypeInt24 {
|
|
return mysql.SignUint24(tval)
|
|
}
|
|
return mysql.SignUint32(tval)
|
|
case uint64:
|
|
return mysql.SignUint64(tval)
|
|
default:
|
|
return val
|
|
}
|
|
}
|