1
0
Fork 0

Everything seem to work now

This commit is contained in:
Gregory Eremin 2018-11-11 14:24:45 +01:00
parent 9cd1d12583
commit 69353f61bc
9 changed files with 167 additions and 50 deletions

31
binlog/event_query.go Normal file
View File

@ -0,0 +1,31 @@
package binlog
import (
"github.com/localhots/bocadillo/tools"
)
// QueryEvent contains query details.
type QueryEvent struct {
SlaveProxyID uint32
ExecutionTime uint32
ErrorCode uint16
StatusVars []byte
Schema []byte
Query []byte
}
// Decode given buffer into a qeury event.
// Spec: https://dev.mysql.com/doc/internals/en/query-event.html
func (e *QueryEvent) Decode(connBuff []byte) {
buf := tools.NewBuffer(connBuff)
e.SlaveProxyID = buf.ReadUint32()
e.ExecutionTime = buf.ReadUint32()
schemaLen := int(buf.ReadUint8())
e.ErrorCode = buf.ReadUint16()
statusVarLen := int(buf.ReadUint8())
copy(e.StatusVars, buf.Read(statusVarLen))
copy(e.Schema, buf.Read(schemaLen))
buf.Skip(1) // Always 0x00
copy(e.Query, buf.Cur())
}

View File

@ -1,11 +1,15 @@
package binlog package binlog
import ( import (
"encoding/hex"
"errors"
"fmt" "fmt"
"runtime/debug"
"time" "time"
"github.com/localhots/bocadillo/mysql" "github.com/localhots/bocadillo/mysql"
"github.com/localhots/bocadillo/tools" "github.com/localhots/bocadillo/tools"
"github.com/localhots/pretty"
) )
// RowsEvent contains a Rows Event. // RowsEvent contains a Rows Event.
@ -40,8 +44,26 @@ func (e *RowsEvent) PeekTableID(connBuff []byte, fd FormatDescription) uint64 {
} }
// Decode decodes given buffer into a rows event event. // Decode decodes given buffer into a rows event event.
func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescription) error { func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescription) (err error) {
// pretty.Println(data) defer func() {
if errv := recover(); errv != nil {
tools.EnableDebug = true
tools.Debug("Recovered from panic in RowsEvent.Decode")
tools.Debug("Error:", errv)
tools.Debug("Format:", fd)
tools.Debug("Table:", td)
tools.Debug("Columns:")
for _, ctb := range td.ColumnTypes {
tools.Debug(" ", mysql.ColumnType(ctb).String())
}
tools.Debug("\nBuffer:")
tools.Debug(hex.Dump(connBuff))
tools.Debug("Stacktrace:")
debug.PrintStack()
err = errors.New(fmt.Sprint(errv))
}
}()
buf := tools.NewBuffer(connBuff) buf := tools.NewBuffer(connBuff)
idSize := fd.TableIDSize(e.Type) idSize := fd.TableIDSize(e.Type)
if idSize == 6 { if idSize == 6 {
@ -65,23 +87,26 @@ func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescri
e.ColumnBitmap2 = buf.ReadStringVarLen(int(e.ColumnCount+7) / 8) e.ColumnBitmap2 = buf.ReadStringVarLen(int(e.ColumnCount+7) / 8)
} }
// pretty.Println(e.Type.String(), buf.Cur())
e.Rows = make([][]interface{}, 0) e.Rows = make([][]interface{}, 0)
for buf.More() { for {
tools.Debug("\n\n=== PARSING ROW\n")
row, err := e.decodeRows(buf, td, e.ColumnBitmap1) row, err := e.decodeRows(buf, td, e.ColumnBitmap1)
if err != nil { if err != nil {
return err return err
} }
e.Rows = append(e.Rows, row) e.Rows = append(e.Rows, row)
if RowsEventHasSecondBitmap(e.Type) { if RowsEventHasSecondBitmap(e.Type) { // && buf.More()
tools.Debug("===== SECOND BITMASK ROUND =====\n")
row, err := e.decodeRows(buf, td, e.ColumnBitmap2) row, err := e.decodeRows(buf, td, e.ColumnBitmap2)
if err != nil { if err != nil {
return err return err
} }
e.Rows = append(e.Rows, row) e.Rows = append(e.Rows, row)
} }
if !buf.More() {
break
}
} }
return nil return nil
} }
@ -100,25 +125,32 @@ func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte
row := make([]interface{}, e.ColumnCount) row := make([]interface{}, e.ColumnCount)
for i := 0; i < int(e.ColumnCount); i++ { for i := 0; i < int(e.ColumnCount); i++ {
if !isBitSet(bm, i) { if !isBitSet(bm, i) {
tools.Debugf("Skipped %s, meta %x, BIT NOT SET\n\n",
mysql.ColumnType(td.ColumnTypes[i]).String(), td.ColumnMeta[i],
)
continue continue
} }
isNull := (uint32(nullBM[nullIdx/8]) >> uint32(nullIdx%8)) & 1 isNull := (uint32(nullBM[nullIdx/8]) >> uint32(nullIdx%8)) & 1
nullIdx++ nullIdx++
if isNull > 0 { if isNull > 0 {
tools.Debugf("Parsed %s, meta %x, NULL\n\n",
mysql.ColumnType(td.ColumnTypes[i]).String(), td.ColumnMeta[i],
)
row[i] = nil row[i] = nil
continue continue
} }
row[i] = e.decodeValue(buf, mysql.ColumnType(td.ColumnTypes[i]), td.ColumnMeta[i]) row[i] = e.decodeValue(buf, mysql.ColumnType(td.ColumnTypes[i]), td.ColumnMeta[i])
// fmt.Printf("Parsed %s, meta %x, value %++v\n", tools.Debugf("Parsed %s, meta %x, value %++v\n\n",
// mysql.ColumnType(td.ColumnTypes[i]).String(), td.ColumnMeta[i], row[i], mysql.ColumnType(td.ColumnTypes[i]).String(), td.ColumnMeta[i], row[i],
// ) )
} }
return row, nil return row, nil
} }
func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uint16) interface{} { func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uint16) interface{} {
tools.Debugf("-- PRE-PARSING %s, meta %x\n", ct.String(), meta)
var length int var length int
if ct == mysql.ColumnTypeString { if ct == mysql.ColumnTypeString {
if meta > 0xFF { if meta > 0xFF {
@ -136,6 +168,8 @@ func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uin
} }
} }
tools.Debugf("-- PARSING %s, meta %x\n", ct.String(), meta)
switch ct { switch ct {
case mysql.ColumnTypeNull: case mysql.ColumnTypeNull:
return nil return nil
@ -215,11 +249,16 @@ func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uin
case mysql.ColumnTypeBit: case mysql.ColumnTypeBit:
nbits := int(((meta >> 8) * 8) + (meta & 0xFF)) nbits := int(((meta >> 8) * 8) + (meta & 0xFF))
length = int(nbits+7) / 8 length = int(nbits+7) / 8
return mysql.DecodeBit(buf.Cur(), nbits, length) v, n := mysql.DecodeBit(buf.Cur(), nbits, length)
buf.Skip(n)
return v
case mysql.ColumnTypeSet: case mysql.ColumnTypeSet:
length = int(meta & 0xFF) length = int(meta & 0xFF)
nbits := length * 8 nbits := length * 8
return mysql.DecodeBit(buf.Cur(), nbits, length) v, n := mysql.DecodeBit(buf.Cur(), nbits, length)
pretty.Println("Decoding set", buf.Cur(), nbits, length, "-->", v)
buf.Skip(n)
return v
// Stuff // Stuff
case mysql.ColumnTypeEnum: case mysql.ColumnTypeEnum:

15
binlog/event_xid.go Normal file
View File

@ -0,0 +1,15 @@
package binlog
import "github.com/localhots/bocadillo/mysql"
// XIDEvent contains an XID (XA transaction identifier)
// https://dev.mysql.com/doc/refman/5.7/en/xa.html
type XIDEvent struct {
XID uint64
}
// Decode decodes given buffer into an XID event.
// Spec: https://dev.mysql.com/doc/internals/en/xid-event.html
func (e *XIDEvent) Decode(connBuff []byte) {
e.XID = mysql.DecodeUint64(connBuff)
}

View File

@ -1,9 +1,9 @@
package main package main
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"log"
"os" "os"
"time" "time"
@ -17,7 +17,6 @@ func main() {
offset := flag.Uint("offset", 0, "Log offset in bytes") offset := flag.Uint("offset", 0, "Log offset in bytes")
flag.Parse() flag.Parse()
ctx := context.Background()
validate((*dsn != ""), "Database source name is not set") validate((*dsn != ""), "Database source name is not set")
validate((*id != 0), "Server ID is not set") validate((*id != 0), "Server ID is not set")
validate((*file != ""), "Binary log file is not set") validate((*file != ""), "Binary log file is not set")
@ -34,7 +33,7 @@ func main() {
reader, err := reader.NewReader(conn) reader, err := reader.NewReader(conn)
if err != nil { if err != nil {
log.Fatalf( "Failed to create reader: %v", err) log.Fatalf("Failed to create reader: %v", err)
} }
off := conf.Offset off := conf.Offset
@ -45,7 +44,7 @@ func main() {
log.Fatalf("Failed to read event: %v", err) log.Fatalf("Failed to read event: %v", err)
} }
ts := time.Unix(int64(evt.Header.Timestamp), 0).Format(time.RFC3339) ts := time.Unix(int64(evt.Header.Timestamp), 0).Format(time.RFC3339)
log.Printf("Event received: %s %d, %d\n", evt.Header.Type.String(), ts, off4 }) log.Printf("Event received: %s %s, %d\n", evt.Header.Type.String(), ts, off)
off = evt.Header.NextOffset off = evt.Header.NextOffset
} }
} }

View File

@ -259,9 +259,9 @@ func DecodeFloat64(data []byte) float64 {
} }
// DecodeBit decodes a bit into not less than 8 bytes. // DecodeBit decodes a bit into not less than 8 bytes.
func DecodeBit(data []byte, nbits int, length int) uint64 { func DecodeBit(data []byte, nbits int, length int) (v uint64, n int) {
if nbits > 1 { if nbits > 1 {
return DecodeVarLen64(data, length) return DecodeVarLen64(data, length), length
} }
return uint64(data[0]) return uint64(data[0]), 1
} }

View File

@ -17,10 +17,12 @@ type Reader struct {
// Event ... // Event ...
type Event struct { type Event struct {
Format binlog.FormatDescription
Header binlog.EventHeader Header binlog.EventHeader
Body []byte Buffer []byte
// Table is not empty for rows events
Table *binlog.TableDescription Table *binlog.TableDescription
Rows *binlog.RowsEvent
} }
// NewReader ... // NewReader ...
@ -47,10 +49,10 @@ func NewReader(conn *SlaveConn) (*Reader, error) {
func (r *Reader) ReadEvent() (*Event, error) { func (r *Reader) ReadEvent() (*Event, error) {
connBuff, err := r.conn.ReadPacket() connBuff, err := r.conn.ReadPacket()
if err != nil { if err != nil {
return nil, err return nil, errors.Annotate(err, "read next event")
} }
var evt Event evt := Event{Format: r.format}
if err := evt.Header.Decode(connBuff, r.format); err != nil { if err := evt.Header.Decode(connBuff, r.format); err != nil {
return nil, errors.Annotate(err, "decode event header") return nil, errors.Annotate(err, "decode event header")
} }
@ -59,34 +61,36 @@ func (r *Reader) ReadEvent() (*Event, error) {
r.state.Offset = uint64(evt.Header.NextOffset) r.state.Offset = uint64(evt.Header.NextOffset)
} }
evt.Body = connBuff[r.format.HeaderLen():] evt.Buffer = connBuff[r.format.HeaderLen():]
csa := r.format.ServerDetails.ChecksumAlgorithm csa := r.format.ServerDetails.ChecksumAlgorithm
if evt.Header.Type != binlog.EventTypeFormatDescription && csa == binlog.ChecksumAlgorithmCRC32 { if evt.Header.Type != binlog.EventTypeFormatDescription && csa == binlog.ChecksumAlgorithmCRC32 {
evt.Body = evt.Body[:len(evt.Body)-4] // Remove trailing CRC32 checksum, we're not going to verify it
evt.Buffer = evt.Buffer[:len(evt.Buffer)-4]
} }
// pretty.Println(h)
switch evt.Header.Type { switch evt.Header.Type {
case binlog.EventTypeFormatDescription: case binlog.EventTypeFormatDescription:
var fde binlog.FormatDescriptionEvent var fde binlog.FormatDescriptionEvent
if err = fde.Decode(evt.Body); err == nil { err = fde.Decode(evt.Buffer)
r.format = fde.FormatDescription if err != nil {
return nil, errors.Annotate(err, "decode format description event")
} }
// pretty.Println(evt.Header.Type.String(), r.format) r.format = fde.FormatDescription
evt.Format = fde.FormatDescription
case binlog.EventTypeRotate: case binlog.EventTypeRotate:
var re binlog.RotateEvent var re binlog.RotateEvent
if err = re.Decode(evt.Body, r.format); err == nil { err = re.Decode(evt.Buffer, r.format)
r.state = re.NextFile if err != nil {
return nil, errors.Annotate(err, "decode rotate event")
} }
// pretty.Println(evt.Header.Type.String(), r.state) r.state = re.NextFile
case binlog.EventTypeTableMap: case binlog.EventTypeTableMap:
var tme binlog.TableMapEvent var tme binlog.TableMapEvent
if err = tme.Decode(evt.Body, r.format); err == nil { err = tme.Decode(evt.Buffer, r.format)
r.tableMap[tme.TableID] = tme.TableDescription if err != nil {
return nil, errors.Annotate(err, "decode table map event")
} }
// pretty.Println(evt.Header.Type.String(), tm) r.tableMap[tme.TableID] = tme.TableDescription
case binlog.EventTypeWriteRowsV0, case binlog.EventTypeWriteRowsV0,
binlog.EventTypeWriteRowsV1, binlog.EventTypeWriteRowsV1,
binlog.EventTypeWriteRowsV2, binlog.EventTypeWriteRowsV2,
@ -98,22 +102,18 @@ func (r *Reader) ReadEvent() (*Event, error) {
binlog.EventTypeDeleteRowsV2: binlog.EventTypeDeleteRowsV2:
re := binlog.RowsEvent{Type: evt.Header.Type} re := binlog.RowsEvent{Type: evt.Header.Type}
tableID := re.PeekTableID(evt.Body, r.format) tableID := re.PeekTableID(evt.Buffer, r.format)
td, ok := r.tableMap[tableID] td, ok := r.tableMap[tableID]
if !ok { if !ok {
return nil, errors.New("Unknown table ID") return nil, errors.New("Unknown table ID")
} }
if err = re.Decode(evt.Body, r.format, td); err != nil {
return nil, err
}
evt.Table = &td evt.Table = &td
evt.Rows = &re case binlog.EventTypeQuery:
// Can be decoded by the receiver
case binlog.EventTypeXID: case binlog.EventTypeXID:
// TODO: Add support // Can be decoded by the receiver
case binlog.EventTypeGTID: case binlog.EventTypeGTID:
// TODO: Add support // TODO: Add support
case binlog.EventTypeQuery:
// TODO: Handle schema changes
} }
return &evt, err return &evt, err

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/localhots/bocadillo/binlog"
"github.com/localhots/bocadillo/mysql" "github.com/localhots/bocadillo/mysql"
"github.com/localhots/bocadillo/reader" "github.com/localhots/bocadillo/reader"
) )
@ -199,12 +200,16 @@ func (s *testSuite) expectValue(t *testing.T, tbl *table, exp interface{}) {
return return
} }
if evt.Table != nil && evt.Table.TableName == tbl.name { if evt.Table != nil && evt.Table.TableName == tbl.name {
// pretty.Println(evt) re := binlog.RowsEvent{Type: evt.Header.Type}
if len(evt.Rows.Rows) != 1 && len(evt.Rows.Rows[0]) != 1 { err := re.Decode(evt.Buffer, evt.Format, *evt.Table)
panic("Expected one row with one value") if err != nil {
t.Fatalf("Failed to decode rows event: %v", err)
}
if len(re.Rows) != 1 && len(re.Rows[0]) != 1 {
t.Fatal("Expected 1 row with 1 value")
} }
out <- evt.Rows.Rows[0][0] out <- re.Rows[0][0]
return return
} }
} }

View File

@ -28,12 +28,15 @@ func NewCommandBuffer(size int) *Buffer {
// Skip advances the cursor by N bytes. // Skip advances the cursor by N bytes.
func (b *Buffer) Skip(n int) { func (b *Buffer) Skip(n int) {
Debugf("Skipped %d bytes: %X\n", n, b.data[b.pos:b.pos+n])
b.pos += n b.pos += n
} }
// Read returns next N bytes and advances the cursor. // Read returns next N bytes and advances the cursor.
func (b *Buffer) Read(n int) []byte { func (b *Buffer) Read(n int) []byte {
b.pos += n b.pos += n
Debugf("Read %d bytes: %X\n", n, b.data[b.pos-n:b.pos])
return b.data[b.pos-n:] return b.data[b.pos-n:]
} }
@ -44,7 +47,8 @@ func (b *Buffer) Cur() []byte {
// More returns true if there's more to read. // More returns true if there's more to read.
func (b *Buffer) More() bool { func (b *Buffer) More() bool {
return b.pos < len(b.data) Debug("*** BUFFER BOUNDS CHECK len:", len(b.data), "pos:", b.pos)
return b.pos < len(b.data)-1
} }
// Bytes returns entire buffer contents. // Bytes returns entire buffer contents.

24
tools/debug.go Normal file
View File

@ -0,0 +1,24 @@
package tools
import (
"fmt"
"github.com/localhots/pretty"
)
// EnableDebug controls debug output.
var EnableDebug = false
// Debug ...
func Debug(vals ...interface{}) {
if EnableDebug {
pretty.Println(vals...)
}
}
// Debugf ...
func Debugf(format string, args ...interface{}) {
if EnableDebug {
fmt.Printf(format, args...)
}
}