1
0
Fork 0

Finish rows parsing

This commit is contained in:
Gregory Eremin 2018-11-08 20:47:49 +01:00
parent 42f22dc377
commit 4d80368814
4 changed files with 207 additions and 75 deletions

View File

@ -1,6 +1,9 @@
package binlog
import (
"fmt"
"time"
"github.com/localhots/bocadillo/mysql"
"github.com/localhots/bocadillo/tools"
"github.com/localhots/pretty"
@ -63,13 +66,28 @@ func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescri
e.ColumnBitmap2 = buf.ReadStringVarLen(int(e.ColumnCount+7) / 8)
}
pretty.Println(e.Type.String(), e, td, buf.Cur())
// pretty.Println(e.Type.String(), buf.Cur())
e.decodeRows(buf, td, e.ColumnBitmap1)
e.Rows = make([][]interface{}, 0)
for buf.More() {
row, err := e.decodeRows(buf, td, e.ColumnBitmap1)
if err != nil {
return err
}
e.Rows = append(e.Rows, row)
if RowsEventHasSecondBitmap(e.Type) {
row, err := e.decodeRows(buf, td, e.ColumnBitmap2)
if err != nil {
return err
}
e.Rows = append(e.Rows, row)
}
}
return nil
}
func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte) {
func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte) ([]interface{}, error) {
count := 0
for i := 0; i < int(e.ColumnCount); i++ {
if isBitSet(bm, i) {
@ -81,8 +99,6 @@ func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte
nullBM := buf.ReadStringVarLen(count)
nullIdx := 0
row := make([]interface{}, e.ColumnCount)
var err error
for i := 0; i < int(e.ColumnCount); i++ {
if !isBitSet(bm, i) {
continue
@ -95,73 +111,159 @@ func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte
continue
}
row[i], err = e.decodeValue(buf, mysql.ColumnType(td.ColumnTypes[i]), td.ColumnMeta[i])
if err != nil {
panic(err)
}
row[i] = e.decodeValue(buf, mysql.ColumnType(td.ColumnTypes[i]), td.ColumnMeta[i])
// pretty.Println("PARSED", mysql.ColumnType(td.ColumnTypes[i]).String(), td.ColumnMeta[i], row[i])
}
return row, nil
}
func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uint16) (interface{}, error) {
switch ct {
case mysql.ColumnTypeDecimal:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeTiny:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeShort:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeLong:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeFloat:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeDouble:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeNull:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeTimestamp:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeLonglong:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeInt24:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeDate:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeTime:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeDatetime:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeYear:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeVarchar:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeBit:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeJSON:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeNewDecimal:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeEnum:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeSet:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeTinyblob:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeMediumblob:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeLongblob:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeBlob:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeVarstring:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeString:
pretty.Println("Type", ct.String())
case mysql.ColumnTypeGeometry:
pretty.Println("Type", ct.String())
func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uint16) interface{} {
var length int
if ct == mysql.ColumnTypeString {
if meta > 0xFF {
typeByte := uint8(meta >> 8)
lengthByte := uint8(meta & 0xFF)
if typeByte&0x30 != 0x30 {
ct = mysql.ColumnType(typeByte | 0x30)
length = int(uint16(lengthByte) | (uint16((typeByte&0x30)^0x30) << 4))
} else {
ct = mysql.ColumnType(typeByte)
length = int(lengthByte)
}
} else {
length = int(meta)
}
}
return nil, nil
switch ct {
case mysql.ColumnTypeNull:
return nil
// Integer
case mysql.ColumnTypeTiny:
return buf.ReadUint8()
case mysql.ColumnTypeShort:
return buf.ReadUint16()
case mysql.ColumnTypeInt24:
return buf.ReadUint24()
case mysql.ColumnTypeLong:
return buf.ReadUint32()
case mysql.ColumnTypeLonglong:
return buf.ReadUint64()
// Float
case mysql.ColumnTypeFloat:
return buf.ReadFloat32()
case mysql.ColumnTypeDouble:
return buf.ReadFloat64()
// Decimals
case mysql.ColumnTypeNewDecimal:
precision := int(meta >> 8)
decimals := int(meta & 0xFF)
dec, n := mysql.DecodeDecimal(buf.Cur(), precision, decimals)
buf.Skip(n)
return dec
// Date and Time
case mysql.ColumnTypeYear:
return uint16(buf.ReadUint8()) + 1900
case mysql.ColumnTypeDate:
v := buf.ReadUint24()
if v == 0 {
return "0000-00-00"
}
return fmt.Sprintf("%04d-%02d-%02d", v/(16*32), v/32%16, v%32)
case mysql.ColumnTypeTime:
v := buf.ReadUint24()
if v == 0 {
return "00:00:00"
}
var sign string
if v < 0 {
sign = "-"
}
return fmt.Sprintf("%s%02d:%02d:%02d", sign, v/10000, (v%10000)/100, v%100)
case mysql.ColumnTypeTime2:
v, n := mysql.DecodeTime2(buf.Cur(), meta)
buf.Skip(n)
return v
case mysql.ColumnTypeTimestamp:
ts := buf.ReadUint32()
return mysql.FracTime{Time: time.Unix(int64(ts), 0)}.String()
case mysql.ColumnTypeTimestamp2:
v, n := mysql.DecodeTimestamp2(buf.Cur(), meta)
buf.Skip(n)
return v
case mysql.ColumnTypeDatetime:
v := buf.ReadUint64()
d := v / 1000000
t := v % 1000000
return mysql.FracTime{Time: time.Date(int(d/10000),
time.Month((d%10000)/100),
int(d%100),
int(t/10000),
int((t%10000)/100),
int(t%100),
0,
time.UTC)}.String()
case mysql.ColumnTypeDatetime2:
v, n := mysql.DecodeDatetime2(buf.Cur(), meta)
buf.Skip(n)
return v
// Strings
// FIXME
case mysql.ColumnTypeString:
return readString(buf, length)
case mysql.ColumnTypeVarchar, mysql.ColumnTypeVarstring:
return readString(buf, int(meta))
// Blobs
case mysql.ColumnTypeBlob, mysql.ColumnTypeGeometry, mysql.ColumnTypeJSON:
return buf.ReadStringVarEnc(int(meta))
case mysql.ColumnTypeTinyblob:
return buf.ReadStringVarEnc(1)
case mysql.ColumnTypeMediumblob:
return buf.ReadStringVarEnc(3)
case mysql.ColumnTypeLongblob:
return buf.ReadStringVarEnc(4)
// Bits
case mysql.ColumnTypeBit:
nbits := int(((meta >> 8) * 8) + (meta & 0xFF))
length = int(nbits+7) / 8
return mysql.DecodeBit(buf.Cur(), nbits, length)
case mysql.ColumnTypeSet:
length = int(meta & 0xFF)
nbits := length * 8
return mysql.DecodeBit(buf.Cur(), nbits, length)
// Stuff
case mysql.ColumnTypeEnum:
return buf.ReadVarLen64(int(meta & 0xFF))
// Unsupported
case mysql.ColumnTypeDecimal:
// Old decimal
fallthrough
case mysql.ColumnTypeNewDate:
// Too new
fallthrough
default:
pretty.Printf("UNSUPPORTED Type %d %s %x %x\n", ct, ct.String(), meta, buf.Cur())
}
return nil
}
// FIXME: Something is fishy with this whole string decoding. It seems like it
// could be simplified greatly
func readString(buf *tools.Buffer, length int) string {
if length < 256 {
return string(buf.ReadStringVarEnc(1))
}
return string(buf.ReadStringVarEnc(2))
}
func isBitSet(bm []byte, i int) bool {

View File

@ -4,7 +4,6 @@ import (
"github.com/juju/errors"
"github.com/localhots/bocadillo/binlog"
"github.com/localhots/bocadillo/schema"
"github.com/localhots/pretty"
)
// Reader ...
@ -20,6 +19,8 @@ type Reader struct {
type Event struct {
Header binlog.EventHeader
Body []byte
Table *binlog.TableDescription
Rows *binlog.RowsEvent
}
// NewReader ...
@ -73,13 +74,13 @@ func (r *Reader) ReadEvent() (*Event, error) {
if err = fde.Decode(evt.Body); err == nil {
r.format = fde.FormatDescription
}
pretty.Println(evt.Header.Type.String(), r.format)
// pretty.Println(evt.Header.Type.String(), r.format)
case binlog.EventTypeRotate:
var re binlog.RotateEvent
if err = re.Decode(evt.Body, r.format); err == nil {
r.state = re.NextFile
}
pretty.Println(evt.Header.Type.String(), r.state)
// pretty.Println(evt.Header.Type.String(), r.state)
case binlog.EventTypeTableMap:
var tme binlog.TableMapEvent
if err = tme.Decode(evt.Body, r.format); err == nil {
@ -102,9 +103,11 @@ func (r *Reader) ReadEvent() (*Event, error) {
if !ok {
return nil, errors.New("Unknown table ID")
}
if err = re.Decode(evt.Body, r.format, td); err == nil {
pretty.Println(re)
if err = re.Decode(evt.Body, r.format, td); err != nil {
return nil, err
}
evt.Table = &td
evt.Rows = &re
case binlog.EventTypeXID:
// TODO: Add support
case binlog.EventTypeGTID:

View File

@ -2,7 +2,6 @@ package reader
import (
"database/sql/driver"
"encoding/hex"
"fmt"
"io"
"os"
@ -101,7 +100,6 @@ func (c *SlaveConn) RegisterSlave() error {
// buf.WriteUint32(replicationRank)
// buf.WriteUint32(masterID)
fmt.Println(hex.Dump(buf.Bytes()))
return c.runCmd(buf.Bytes())
}

View File

@ -2,6 +2,7 @@ package tools
import (
"encoding/binary"
"math"
"github.com/localhots/bocadillo/mysql"
)
@ -42,6 +43,11 @@ func (b *Buffer) Cur() []byte {
return b.data[b.pos:]
}
// More returns true if there's more to read.
func (b *Buffer) More() bool {
return b.pos < len(b.data)
}
// Bytes returns entire buffer contents.
func (b *Buffer) Bytes() []byte {
return b.data
@ -84,6 +90,22 @@ func (b *Buffer) ReadUintLenEnc() (val uint64, isNull bool, size int) {
return
}
// ReadVarLen64 reads a number encoded in given size of bytes and advances
// cursor accordingly.
func (b *Buffer) ReadVarLen64(n int) uint64 {
return mysql.DecodeVarLen64(b.Read(n), n)
}
// ReadFloat32 reads a float32 and advances cursor by 4 bytes.
func (b *Buffer) ReadFloat32() float32 {
return math.Float32frombits(b.ReadUint32())
}
// ReadFloat64 reads a float64 and advances cursor by 8 bytes.
func (b *Buffer) ReadFloat64() float64 {
return math.Float64frombits(b.ReadUint64())
}
// ReadStringNullTerm reads a NULL-terminated string and advances cursor by its
// length plus 1 extra byte.
func (b *Buffer) ReadStringNullTerm() []byte {
@ -98,6 +120,13 @@ func (b *Buffer) ReadStringVarLen(n int) []byte {
return mysql.DecodeStringVarLen(b.Read(n), n)
}
// ReadStringVarEnc reads a variable-length length of the string and the string
// itself, then advances cursor by the same number of bytes.
func (b *Buffer) ReadStringVarEnc(n int) []byte {
length := int(mysql.DecodeVarLen64(b.Read(n), n))
return mysql.DecodeStringVarLen(b.Read(length), length)
}
// ReadStringLenEnc reads a length-encoded string and advances cursor
// accordingly.
func (b *Buffer) ReadStringLenEnc() (str []byte, size int) {