diff --git a/binlog/event_rows.go b/binlog/event_rows.go index 5210855..a9a62ad 100644 --- a/binlog/event_rows.go +++ b/binlog/event_rows.go @@ -1,6 +1,9 @@ package binlog import ( + "fmt" + "time" + "github.com/localhots/bocadillo/mysql" "github.com/localhots/bocadillo/tools" "github.com/localhots/pretty" @@ -63,13 +66,28 @@ func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescri e.ColumnBitmap2 = buf.ReadStringVarLen(int(e.ColumnCount+7) / 8) } - pretty.Println(e.Type.String(), e, td, buf.Cur()) + // pretty.Println(e.Type.String(), buf.Cur()) - e.decodeRows(buf, td, e.ColumnBitmap1) + e.Rows = make([][]interface{}, 0) + for buf.More() { + row, err := e.decodeRows(buf, td, e.ColumnBitmap1) + if err != nil { + return err + } + e.Rows = append(e.Rows, row) + + if RowsEventHasSecondBitmap(e.Type) { + row, err := e.decodeRows(buf, td, e.ColumnBitmap2) + if err != nil { + return err + } + e.Rows = append(e.Rows, row) + } + } return nil } -func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte) { +func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte) ([]interface{}, error) { count := 0 for i := 0; i < int(e.ColumnCount); i++ { if isBitSet(bm, i) { @@ -81,8 +99,6 @@ func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte nullBM := buf.ReadStringVarLen(count) nullIdx := 0 row := make([]interface{}, e.ColumnCount) - - var err error for i := 0; i < int(e.ColumnCount); i++ { if !isBitSet(bm, i) { continue @@ -95,73 +111,159 @@ func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte continue } - row[i], err = e.decodeValue(buf, mysql.ColumnType(td.ColumnTypes[i]), td.ColumnMeta[i]) - - if err != nil { - panic(err) - } + row[i] = e.decodeValue(buf, mysql.ColumnType(td.ColumnTypes[i]), td.ColumnMeta[i]) + // pretty.Println("PARSED", mysql.ColumnType(td.ColumnTypes[i]).String(), td.ColumnMeta[i], row[i]) } + return row, nil } -func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uint16) (interface{}, error) { - switch ct { - case mysql.ColumnTypeDecimal: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeTiny: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeShort: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeLong: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeFloat: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeDouble: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeNull: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeTimestamp: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeLonglong: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeInt24: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeDate: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeTime: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeDatetime: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeYear: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeVarchar: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeBit: - pretty.Println("Type", ct.String()) - - case mysql.ColumnTypeJSON: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeNewDecimal: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeEnum: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeSet: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeTinyblob: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeMediumblob: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeLongblob: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeBlob: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeVarstring: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeString: - pretty.Println("Type", ct.String()) - case mysql.ColumnTypeGeometry: - pretty.Println("Type", ct.String()) +func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uint16) interface{} { + var length int + if ct == mysql.ColumnTypeString { + if meta > 0xFF { + typeByte := uint8(meta >> 8) + lengthByte := uint8(meta & 0xFF) + if typeByte&0x30 != 0x30 { + ct = mysql.ColumnType(typeByte | 0x30) + length = int(uint16(lengthByte) | (uint16((typeByte&0x30)^0x30) << 4)) + } else { + ct = mysql.ColumnType(typeByte) + length = int(lengthByte) + } + } else { + length = int(meta) + } } - return nil, nil + + switch ct { + case mysql.ColumnTypeNull: + return nil + + // Integer + case mysql.ColumnTypeTiny: + return buf.ReadUint8() + case mysql.ColumnTypeShort: + return buf.ReadUint16() + case mysql.ColumnTypeInt24: + return buf.ReadUint24() + case mysql.ColumnTypeLong: + return buf.ReadUint32() + case mysql.ColumnTypeLonglong: + return buf.ReadUint64() + + // Float + case mysql.ColumnTypeFloat: + return buf.ReadFloat32() + case mysql.ColumnTypeDouble: + return buf.ReadFloat64() + + // Decimals + case mysql.ColumnTypeNewDecimal: + precision := int(meta >> 8) + decimals := int(meta & 0xFF) + dec, n := mysql.DecodeDecimal(buf.Cur(), precision, decimals) + buf.Skip(n) + return dec + + // Date and Time + case mysql.ColumnTypeYear: + return uint16(buf.ReadUint8()) + 1900 + case mysql.ColumnTypeDate: + v := buf.ReadUint24() + if v == 0 { + return "0000-00-00" + } + return fmt.Sprintf("%04d-%02d-%02d", v/(16*32), v/32%16, v%32) + case mysql.ColumnTypeTime: + v := buf.ReadUint24() + if v == 0 { + return "00:00:00" + } + var sign string + if v < 0 { + sign = "-" + } + return fmt.Sprintf("%s%02d:%02d:%02d", sign, v/10000, (v%10000)/100, v%100) + case mysql.ColumnTypeTime2: + v, n := mysql.DecodeTime2(buf.Cur(), meta) + buf.Skip(n) + return v + case mysql.ColumnTypeTimestamp: + ts := buf.ReadUint32() + return mysql.FracTime{Time: time.Unix(int64(ts), 0)}.String() + case mysql.ColumnTypeTimestamp2: + v, n := mysql.DecodeTimestamp2(buf.Cur(), meta) + buf.Skip(n) + return v + case mysql.ColumnTypeDatetime: + v := buf.ReadUint64() + d := v / 1000000 + t := v % 1000000 + return mysql.FracTime{Time: time.Date(int(d/10000), + time.Month((d%10000)/100), + int(d%100), + int(t/10000), + int((t%10000)/100), + int(t%100), + 0, + time.UTC)}.String() + case mysql.ColumnTypeDatetime2: + v, n := mysql.DecodeDatetime2(buf.Cur(), meta) + buf.Skip(n) + return v + + // Strings + // FIXME + case mysql.ColumnTypeString: + return readString(buf, length) + case mysql.ColumnTypeVarchar, mysql.ColumnTypeVarstring: + return readString(buf, int(meta)) + + // Blobs + case mysql.ColumnTypeBlob, mysql.ColumnTypeGeometry, mysql.ColumnTypeJSON: + return buf.ReadStringVarEnc(int(meta)) + case mysql.ColumnTypeTinyblob: + return buf.ReadStringVarEnc(1) + case mysql.ColumnTypeMediumblob: + return buf.ReadStringVarEnc(3) + case mysql.ColumnTypeLongblob: + return buf.ReadStringVarEnc(4) + + // Bits + case mysql.ColumnTypeBit: + nbits := int(((meta >> 8) * 8) + (meta & 0xFF)) + length = int(nbits+7) / 8 + return mysql.DecodeBit(buf.Cur(), nbits, length) + case mysql.ColumnTypeSet: + length = int(meta & 0xFF) + nbits := length * 8 + return mysql.DecodeBit(buf.Cur(), nbits, length) + + // Stuff + case mysql.ColumnTypeEnum: + return buf.ReadVarLen64(int(meta & 0xFF)) + + // Unsupported + case mysql.ColumnTypeDecimal: + // Old decimal + fallthrough + case mysql.ColumnTypeNewDate: + // Too new + fallthrough + default: + pretty.Printf("UNSUPPORTED Type %d %s %x %x\n", ct, ct.String(), meta, buf.Cur()) + } + + return nil +} + +// FIXME: Something is fishy with this whole string decoding. It seems like it +// could be simplified greatly +func readString(buf *tools.Buffer, length int) string { + if length < 256 { + return string(buf.ReadStringVarEnc(1)) + } + return string(buf.ReadStringVarEnc(2)) } func isBitSet(bm []byte, i int) bool { diff --git a/reader/reader.go b/reader/reader.go index 85f54a0..74d0234 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -4,7 +4,6 @@ import ( "github.com/juju/errors" "github.com/localhots/bocadillo/binlog" "github.com/localhots/bocadillo/schema" - "github.com/localhots/pretty" ) // Reader ... @@ -20,6 +19,8 @@ type Reader struct { type Event struct { Header binlog.EventHeader Body []byte + Table *binlog.TableDescription + Rows *binlog.RowsEvent } // NewReader ... @@ -73,13 +74,13 @@ func (r *Reader) ReadEvent() (*Event, error) { if err = fde.Decode(evt.Body); err == nil { r.format = fde.FormatDescription } - pretty.Println(evt.Header.Type.String(), r.format) + // pretty.Println(evt.Header.Type.String(), r.format) case binlog.EventTypeRotate: var re binlog.RotateEvent if err = re.Decode(evt.Body, r.format); err == nil { r.state = re.NextFile } - pretty.Println(evt.Header.Type.String(), r.state) + // pretty.Println(evt.Header.Type.String(), r.state) case binlog.EventTypeTableMap: var tme binlog.TableMapEvent if err = tme.Decode(evt.Body, r.format); err == nil { @@ -102,9 +103,11 @@ func (r *Reader) ReadEvent() (*Event, error) { if !ok { return nil, errors.New("Unknown table ID") } - if err = re.Decode(evt.Body, r.format, td); err == nil { - pretty.Println(re) + if err = re.Decode(evt.Body, r.format, td); err != nil { + return nil, err } + evt.Table = &td + evt.Rows = &re case binlog.EventTypeXID: // TODO: Add support case binlog.EventTypeGTID: diff --git a/reader/slave_conn.go b/reader/slave_conn.go index fd93a2e..0da9718 100644 --- a/reader/slave_conn.go +++ b/reader/slave_conn.go @@ -2,7 +2,6 @@ package reader import ( "database/sql/driver" - "encoding/hex" "fmt" "io" "os" @@ -101,7 +100,6 @@ func (c *SlaveConn) RegisterSlave() error { // buf.WriteUint32(replicationRank) // buf.WriteUint32(masterID) - fmt.Println(hex.Dump(buf.Bytes())) return c.runCmd(buf.Bytes()) } diff --git a/tools/buffer.go b/tools/buffer.go index 182877c..0213366 100644 --- a/tools/buffer.go +++ b/tools/buffer.go @@ -2,6 +2,7 @@ package tools import ( "encoding/binary" + "math" "github.com/localhots/bocadillo/mysql" ) @@ -42,6 +43,11 @@ func (b *Buffer) Cur() []byte { return b.data[b.pos:] } +// More returns true if there's more to read. +func (b *Buffer) More() bool { + return b.pos < len(b.data) +} + // Bytes returns entire buffer contents. func (b *Buffer) Bytes() []byte { return b.data @@ -84,6 +90,22 @@ func (b *Buffer) ReadUintLenEnc() (val uint64, isNull bool, size int) { return } +// ReadVarLen64 reads a number encoded in given size of bytes and advances +// cursor accordingly. +func (b *Buffer) ReadVarLen64(n int) uint64 { + return mysql.DecodeVarLen64(b.Read(n), n) +} + +// ReadFloat32 reads a float32 and advances cursor by 4 bytes. +func (b *Buffer) ReadFloat32() float32 { + return math.Float32frombits(b.ReadUint32()) +} + +// ReadFloat64 reads a float64 and advances cursor by 8 bytes. +func (b *Buffer) ReadFloat64() float64 { + return math.Float64frombits(b.ReadUint64()) +} + // ReadStringNullTerm reads a NULL-terminated string and advances cursor by its // length plus 1 extra byte. func (b *Buffer) ReadStringNullTerm() []byte { @@ -98,6 +120,13 @@ func (b *Buffer) ReadStringVarLen(n int) []byte { return mysql.DecodeStringVarLen(b.Read(n), n) } +// ReadStringVarEnc reads a variable-length length of the string and the string +// itself, then advances cursor by the same number of bytes. +func (b *Buffer) ReadStringVarEnc(n int) []byte { + length := int(mysql.DecodeVarLen64(b.Read(n), n)) + return mysql.DecodeStringVarLen(b.Read(length), length) +} + // ReadStringLenEnc reads a length-encoded string and advances cursor // accordingly. func (b *Buffer) ReadStringLenEnc() (str []byte, size int) {