diff --git a/binlog/event_query.go b/binlog/event_query.go new file mode 100644 index 0000000..efe8a46 --- /dev/null +++ b/binlog/event_query.go @@ -0,0 +1,31 @@ +package binlog + +import ( + "github.com/localhots/bocadillo/tools" +) + +// QueryEvent contains query details. +type QueryEvent struct { + SlaveProxyID uint32 + ExecutionTime uint32 + ErrorCode uint16 + StatusVars []byte + Schema []byte + Query []byte +} + +// Decode given buffer into a qeury event. +// Spec: https://dev.mysql.com/doc/internals/en/query-event.html +func (e *QueryEvent) Decode(connBuff []byte) { + buf := tools.NewBuffer(connBuff) + + e.SlaveProxyID = buf.ReadUint32() + e.ExecutionTime = buf.ReadUint32() + schemaLen := int(buf.ReadUint8()) + e.ErrorCode = buf.ReadUint16() + statusVarLen := int(buf.ReadUint8()) + copy(e.StatusVars, buf.Read(statusVarLen)) + copy(e.Schema, buf.Read(schemaLen)) + buf.Skip(1) // Always 0x00 + copy(e.Query, buf.Cur()) +} diff --git a/binlog/event_rows.go b/binlog/event_rows.go index ed720f9..b7c00c0 100644 --- a/binlog/event_rows.go +++ b/binlog/event_rows.go @@ -1,11 +1,15 @@ package binlog import ( + "encoding/hex" + "errors" "fmt" + "runtime/debug" "time" "github.com/localhots/bocadillo/mysql" "github.com/localhots/bocadillo/tools" + "github.com/localhots/pretty" ) // RowsEvent contains a Rows Event. @@ -40,8 +44,26 @@ func (e *RowsEvent) PeekTableID(connBuff []byte, fd FormatDescription) uint64 { } // Decode decodes given buffer into a rows event event. -func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescription) error { - // pretty.Println(data) +func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescription) (err error) { + defer func() { + if errv := recover(); errv != nil { + tools.EnableDebug = true + tools.Debug("Recovered from panic in RowsEvent.Decode") + tools.Debug("Error:", errv) + tools.Debug("Format:", fd) + tools.Debug("Table:", td) + tools.Debug("Columns:") + for _, ctb := range td.ColumnTypes { + tools.Debug(" ", mysql.ColumnType(ctb).String()) + } + tools.Debug("\nBuffer:") + tools.Debug(hex.Dump(connBuff)) + tools.Debug("Stacktrace:") + debug.PrintStack() + err = errors.New(fmt.Sprint(errv)) + } + }() + buf := tools.NewBuffer(connBuff) idSize := fd.TableIDSize(e.Type) if idSize == 6 { @@ -65,23 +87,26 @@ func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescri e.ColumnBitmap2 = buf.ReadStringVarLen(int(e.ColumnCount+7) / 8) } - // pretty.Println(e.Type.String(), buf.Cur()) - e.Rows = make([][]interface{}, 0) - for buf.More() { + for { + tools.Debug("\n\n=== PARSING ROW\n") row, err := e.decodeRows(buf, td, e.ColumnBitmap1) if err != nil { return err } e.Rows = append(e.Rows, row) - if RowsEventHasSecondBitmap(e.Type) { + if RowsEventHasSecondBitmap(e.Type) { // && buf.More() + tools.Debug("===== SECOND BITMASK ROUND =====\n") row, err := e.decodeRows(buf, td, e.ColumnBitmap2) if err != nil { return err } e.Rows = append(e.Rows, row) } + if !buf.More() { + break + } } return nil } @@ -100,25 +125,32 @@ func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte row := make([]interface{}, e.ColumnCount) for i := 0; i < int(e.ColumnCount); i++ { if !isBitSet(bm, i) { + tools.Debugf("Skipped %s, meta %x, BIT NOT SET\n\n", + mysql.ColumnType(td.ColumnTypes[i]).String(), td.ColumnMeta[i], + ) continue } isNull := (uint32(nullBM[nullIdx/8]) >> uint32(nullIdx%8)) & 1 nullIdx++ if isNull > 0 { + tools.Debugf("Parsed %s, meta %x, NULL\n\n", + mysql.ColumnType(td.ColumnTypes[i]).String(), td.ColumnMeta[i], + ) row[i] = nil continue } row[i] = e.decodeValue(buf, mysql.ColumnType(td.ColumnTypes[i]), td.ColumnMeta[i]) - // fmt.Printf("Parsed %s, meta %x, value %++v\n", - // mysql.ColumnType(td.ColumnTypes[i]).String(), td.ColumnMeta[i], row[i], - // ) + tools.Debugf("Parsed %s, meta %x, value %++v\n\n", + mysql.ColumnType(td.ColumnTypes[i]).String(), td.ColumnMeta[i], row[i], + ) } return row, nil } func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uint16) interface{} { + tools.Debugf("-- PRE-PARSING %s, meta %x\n", ct.String(), meta) var length int if ct == mysql.ColumnTypeString { if meta > 0xFF { @@ -136,6 +168,8 @@ func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uin } } + tools.Debugf("-- PARSING %s, meta %x\n", ct.String(), meta) + switch ct { case mysql.ColumnTypeNull: return nil @@ -215,11 +249,16 @@ func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uin case mysql.ColumnTypeBit: nbits := int(((meta >> 8) * 8) + (meta & 0xFF)) length = int(nbits+7) / 8 - return mysql.DecodeBit(buf.Cur(), nbits, length) + v, n := mysql.DecodeBit(buf.Cur(), nbits, length) + buf.Skip(n) + return v case mysql.ColumnTypeSet: length = int(meta & 0xFF) nbits := length * 8 - return mysql.DecodeBit(buf.Cur(), nbits, length) + v, n := mysql.DecodeBit(buf.Cur(), nbits, length) + pretty.Println("Decoding set", buf.Cur(), nbits, length, "-->", v) + buf.Skip(n) + return v // Stuff case mysql.ColumnTypeEnum: diff --git a/binlog/event_xid.go b/binlog/event_xid.go new file mode 100644 index 0000000..b24f259 --- /dev/null +++ b/binlog/event_xid.go @@ -0,0 +1,15 @@ +package binlog + +import "github.com/localhots/bocadillo/mysql" + +// XIDEvent contains an XID (XA transaction identifier) +// https://dev.mysql.com/doc/refman/5.7/en/xa.html +type XIDEvent struct { + XID uint64 +} + +// Decode decodes given buffer into an XID event. +// Spec: https://dev.mysql.com/doc/internals/en/xid-event.html +func (e *XIDEvent) Decode(connBuff []byte) { + e.XID = mysql.DecodeUint64(connBuff) +} diff --git a/cmd/main.go b/cmd/main.go index 74161c7..e2650e0 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,9 +1,9 @@ package main import ( - "context" "flag" "fmt" + "log" "os" "time" @@ -17,7 +17,6 @@ func main() { offset := flag.Uint("offset", 0, "Log offset in bytes") flag.Parse() - ctx := context.Background() validate((*dsn != ""), "Database source name is not set") validate((*id != 0), "Server ID is not set") validate((*file != ""), "Binary log file is not set") @@ -34,7 +33,7 @@ func main() { reader, err := reader.NewReader(conn) if err != nil { - log.Fatalf( "Failed to create reader: %v", err) + log.Fatalf("Failed to create reader: %v", err) } off := conf.Offset @@ -45,7 +44,7 @@ func main() { log.Fatalf("Failed to read event: %v", err) } ts := time.Unix(int64(evt.Header.Timestamp), 0).Format(time.RFC3339) - log.Printf("Event received: %s %d, %d\n", evt.Header.Type.String(), ts, off4 }) + log.Printf("Event received: %s %s, %d\n", evt.Header.Type.String(), ts, off) off = evt.Header.NextOffset } } diff --git a/mysql/binary.go b/mysql/binary.go index a36af39..707f58d 100644 --- a/mysql/binary.go +++ b/mysql/binary.go @@ -259,9 +259,9 @@ func DecodeFloat64(data []byte) float64 { } // DecodeBit decodes a bit into not less than 8 bytes. -func DecodeBit(data []byte, nbits int, length int) uint64 { +func DecodeBit(data []byte, nbits int, length int) (v uint64, n int) { if nbits > 1 { - return DecodeVarLen64(data, length) + return DecodeVarLen64(data, length), length } - return uint64(data[0]) + return uint64(data[0]), 1 } diff --git a/reader/reader.go b/reader/reader.go index f04faa8..ff9e819 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -17,10 +17,12 @@ type Reader struct { // Event ... type Event struct { + Format binlog.FormatDescription Header binlog.EventHeader - Body []byte - Table *binlog.TableDescription - Rows *binlog.RowsEvent + Buffer []byte + + // Table is not empty for rows events + Table *binlog.TableDescription } // NewReader ... @@ -47,10 +49,10 @@ func NewReader(conn *SlaveConn) (*Reader, error) { func (r *Reader) ReadEvent() (*Event, error) { connBuff, err := r.conn.ReadPacket() if err != nil { - return nil, err + return nil, errors.Annotate(err, "read next event") } - var evt Event + evt := Event{Format: r.format} if err := evt.Header.Decode(connBuff, r.format); err != nil { return nil, errors.Annotate(err, "decode event header") } @@ -59,34 +61,36 @@ func (r *Reader) ReadEvent() (*Event, error) { r.state.Offset = uint64(evt.Header.NextOffset) } - evt.Body = connBuff[r.format.HeaderLen():] - + evt.Buffer = connBuff[r.format.HeaderLen():] csa := r.format.ServerDetails.ChecksumAlgorithm if evt.Header.Type != binlog.EventTypeFormatDescription && csa == binlog.ChecksumAlgorithmCRC32 { - evt.Body = evt.Body[:len(evt.Body)-4] + // Remove trailing CRC32 checksum, we're not going to verify it + evt.Buffer = evt.Buffer[:len(evt.Buffer)-4] } - // pretty.Println(h) - switch evt.Header.Type { case binlog.EventTypeFormatDescription: var fde binlog.FormatDescriptionEvent - if err = fde.Decode(evt.Body); err == nil { - r.format = fde.FormatDescription + err = fde.Decode(evt.Buffer) + if err != nil { + return nil, errors.Annotate(err, "decode format description event") } - // pretty.Println(evt.Header.Type.String(), r.format) + r.format = fde.FormatDescription + evt.Format = fde.FormatDescription case binlog.EventTypeRotate: var re binlog.RotateEvent - if err = re.Decode(evt.Body, r.format); err == nil { - r.state = re.NextFile + err = re.Decode(evt.Buffer, r.format) + if err != nil { + return nil, errors.Annotate(err, "decode rotate event") } - // pretty.Println(evt.Header.Type.String(), r.state) + r.state = re.NextFile case binlog.EventTypeTableMap: var tme binlog.TableMapEvent - if err = tme.Decode(evt.Body, r.format); err == nil { - r.tableMap[tme.TableID] = tme.TableDescription + err = tme.Decode(evt.Buffer, r.format) + if err != nil { + return nil, errors.Annotate(err, "decode table map event") } - // pretty.Println(evt.Header.Type.String(), tm) + r.tableMap[tme.TableID] = tme.TableDescription case binlog.EventTypeWriteRowsV0, binlog.EventTypeWriteRowsV1, binlog.EventTypeWriteRowsV2, @@ -98,22 +102,18 @@ func (r *Reader) ReadEvent() (*Event, error) { binlog.EventTypeDeleteRowsV2: re := binlog.RowsEvent{Type: evt.Header.Type} - tableID := re.PeekTableID(evt.Body, r.format) + tableID := re.PeekTableID(evt.Buffer, r.format) td, ok := r.tableMap[tableID] if !ok { return nil, errors.New("Unknown table ID") } - if err = re.Decode(evt.Body, r.format, td); err != nil { - return nil, err - } evt.Table = &td - evt.Rows = &re + case binlog.EventTypeQuery: + // Can be decoded by the receiver case binlog.EventTypeXID: - // TODO: Add support + // Can be decoded by the receiver case binlog.EventTypeGTID: // TODO: Add support - case binlog.EventTypeQuery: - // TODO: Handle schema changes } return &evt, err diff --git a/tests/suite_test.go b/tests/suite_test.go index 08b1e42..19575dd 100644 --- a/tests/suite_test.go +++ b/tests/suite_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/localhots/bocadillo/binlog" "github.com/localhots/bocadillo/mysql" "github.com/localhots/bocadillo/reader" ) @@ -199,12 +200,16 @@ func (s *testSuite) expectValue(t *testing.T, tbl *table, exp interface{}) { return } if evt.Table != nil && evt.Table.TableName == tbl.name { - // pretty.Println(evt) - if len(evt.Rows.Rows) != 1 && len(evt.Rows.Rows[0]) != 1 { - panic("Expected one row with one value") + re := binlog.RowsEvent{Type: evt.Header.Type} + err := re.Decode(evt.Buffer, evt.Format, *evt.Table) + if err != nil { + t.Fatalf("Failed to decode rows event: %v", err) + } + if len(re.Rows) != 1 && len(re.Rows[0]) != 1 { + t.Fatal("Expected 1 row with 1 value") } - out <- evt.Rows.Rows[0][0] + out <- re.Rows[0][0] return } } diff --git a/tools/buffer.go b/tools/buffer.go index 7851c34..ce68071 100644 --- a/tools/buffer.go +++ b/tools/buffer.go @@ -28,12 +28,15 @@ func NewCommandBuffer(size int) *Buffer { // Skip advances the cursor by N bytes. func (b *Buffer) Skip(n int) { + Debugf("Skipped %d bytes: %X\n", n, b.data[b.pos:b.pos+n]) b.pos += n } // Read returns next N bytes and advances the cursor. func (b *Buffer) Read(n int) []byte { b.pos += n + + Debugf("Read %d bytes: %X\n", n, b.data[b.pos-n:b.pos]) return b.data[b.pos-n:] } @@ -44,7 +47,8 @@ func (b *Buffer) Cur() []byte { // More returns true if there's more to read. func (b *Buffer) More() bool { - return b.pos < len(b.data) + Debug("*** BUFFER BOUNDS CHECK len:", len(b.data), "pos:", b.pos) + return b.pos < len(b.data)-1 } // Bytes returns entire buffer contents. diff --git a/tools/debug.go b/tools/debug.go new file mode 100644 index 0000000..42ee887 --- /dev/null +++ b/tools/debug.go @@ -0,0 +1,24 @@ +package tools + +import ( + "fmt" + + "github.com/localhots/pretty" +) + +// EnableDebug controls debug output. +var EnableDebug = false + +// Debug ... +func Debug(vals ...interface{}) { + if EnableDebug { + pretty.Println(vals...) + } +} + +// Debugf ... +func Debugf(format string, args ...interface{}) { + if EnableDebug { + fmt.Printf(format, args...) + } +}