diff --git a/cmd/main.go b/cmd/main.go index 18ddc1e..93c9e84 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,7 +7,7 @@ import ( "os" "time" - "github.com/localhots/blt" + "github.com/localhots/blt/parser" "github.com/localhots/gobelt/log" ) @@ -22,13 +22,13 @@ func main() { validate((*dsn != ""), "Database source name is not set") validate((*id != 0), "Server ID is not set") validate((*file != ""), "Binary log file is not set") - conf := blt.Config{ + conf := parser.Config{ ServerID: uint32(*id), File: *file, Offset: uint32(*offset), } - reader, err := blt.Connect(*dsn, conf) + reader, err := parser.Connect(*dsn, conf) if err != nil { log.Fatalf(ctx, "Failed to establish connection: %v", err) } diff --git a/event_rows.go b/event_rows.go deleted file mode 100644 index 7e8078d..0000000 --- a/event_rows.go +++ /dev/null @@ -1,64 +0,0 @@ -package blt - -import ( - "fmt" - - "github.com/localhots/pretty" -) - -// Rows contains a Rows Event. -type Rows struct { - EventType EventType - TableID uint64 - Flags uint16 - ExtraData []byte - ColumnCount uint64 - ColumnBitmap1 []byte - ColumnBitmap2 []byte - Rows [][]interface{} -} - -type rowsFlag uint16 - -const ( - rowsFlagEndOfStatement rowsFlag = 0x0001 - rowsFlagNoForeignKeyChecks rowsFlag = 0x0002 - rowsFlagNoUniqueKeyChecks rowsFlag = 0x0004 - rowsFlagRowHasColumns rowsFlag = 0x0008 - - freeTableMapID = 0x00FFFFFF -) - -func (r *Reader) decodeRowsEvent(data []byte, typ EventType) { - // pretty.Println(data) - buf := newReadBuffer(data) - rows := Rows{EventType: typ} - idSize := r.format.tableIDSize(typ) - if idSize == 6 { - rows.TableID = buf.readUint48() - } else { - rows.TableID = uint64(buf.readUint32()) - } - - rows.Flags = buf.readUint16() - - if typ.isEither(WriteRowsEventV2, UpdateRowsEventV2, DeleteRowsEventV2) { - // Extra data length is part of extra data, deduct 2 bytes as they - // already store its length - extraLen := buf.readUint16() - 2 - rows.ExtraData = buf.readStringVarLen(int(extraLen)) - } - - rows.ColumnCount, _ = buf.readUintLenEnc() - rows.ColumnBitmap1 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8) - if typ.isEither(UpdateRowsEventV2, UpdateRowsEventV1) { - rows.ColumnBitmap2 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8) - } - - tm, ok := r.tableMap[rows.TableID] - if !ok { - panic(fmt.Errorf("Out of sync: no table map definition for ID=%d", rows.TableID)) - } - - pretty.Println(typ.String(), rows, tm, buf.cur()) -} diff --git a/buffer.go b/parser/buffer.go similarity index 99% rename from buffer.go rename to parser/buffer.go index c9a4b5f..0319963 100644 --- a/buffer.go +++ b/parser/buffer.go @@ -1,4 +1,4 @@ -package blt +package parser import ( "encoding/binary" diff --git a/column_types.go b/parser/column_types.go similarity index 99% rename from column_types.go rename to parser/column_types.go index e2409cc..1e84ce4 100644 --- a/column_types.go +++ b/parser/column_types.go @@ -1,4 +1,4 @@ -package blt +package parser import ( "fmt" diff --git a/encoding.go b/parser/encoding.go similarity index 99% rename from encoding.go rename to parser/encoding.go index aaede18..2523554 100644 --- a/encoding.go +++ b/parser/encoding.go @@ -1,4 +1,4 @@ -package blt +package parser import ( "encoding/binary" diff --git a/encoding_test.go b/parser/encoding_test.go similarity index 88% rename from encoding_test.go rename to parser/encoding_test.go index 29c6885..18ce7be 100644 --- a/encoding_test.go +++ b/parser/encoding_test.go @@ -1,4 +1,4 @@ -package blt +package parser import "testing" diff --git a/event_format_description.go b/parser/event_format_description.go similarity index 97% rename from event_format_description.go rename to parser/event_format_description.go index f735d69..0ac42e4 100644 --- a/event_format_description.go +++ b/parser/event_format_description.go @@ -1,11 +1,9 @@ -package blt +package parser import ( "fmt" "strconv" "strings" - - "github.com/localhots/pretty" ) // FormatDescription is a description of binary log format. @@ -55,7 +53,6 @@ func decodeFormatDescription(data []byte) FormatDescription { EventHeaderLength: buf.readUint8(), EventTypeHeaderLengths: buf.readStringEOF(), } - pretty.Println(fd) fd.ServerDetails = ServerDetails{ Flavor: FlavorMySQL, Version: parseVersionNumber(fd.ServerVersion), diff --git a/event_header.go b/parser/event_header.go similarity index 99% rename from event_header.go rename to parser/event_header.go index f4ccc00..940cde2 100644 --- a/event_header.go +++ b/parser/event_header.go @@ -1,4 +1,4 @@ -package blt +package parser import ( "errors" diff --git a/event_rotate.go b/parser/event_rotate.go similarity index 94% rename from event_rotate.go rename to parser/event_rotate.go index cf331f0..b728851 100644 --- a/event_rotate.go +++ b/parser/event_rotate.go @@ -1,4 +1,4 @@ -package blt +package parser func (r *Reader) decodeRotateEvent(data []byte) Position { buf := newReadBuffer(data) diff --git a/parser/event_rows.go b/parser/event_rows.go new file mode 100644 index 0000000..a689cb7 --- /dev/null +++ b/parser/event_rows.go @@ -0,0 +1,169 @@ +package parser + +import ( + "fmt" + + "github.com/localhots/pretty" +) + +// Rows contains a Rows Event. +type Rows struct { + EventType EventType + TableID uint64 + Flags uint16 + ExtraData []byte + ColumnCount uint64 + ColumnBitmap1 []byte + ColumnBitmap2 []byte + Rows [][]interface{} + TableMap *TableMap +} + +type rowsFlag uint16 + +const ( + rowsFlagEndOfStatement rowsFlag = 0x0001 + rowsFlagNoForeignKeyChecks rowsFlag = 0x0002 + rowsFlagNoUniqueKeyChecks rowsFlag = 0x0004 + rowsFlagRowHasColumns rowsFlag = 0x0008 + + freeTableMapID = 0x00FFFFFF +) + +func (r *Reader) decodeRowsEvent(data []byte, typ EventType) { + // pretty.Println(data) + buf := newReadBuffer(data) + rows := Rows{EventType: typ} + idSize := r.format.tableIDSize(typ) + if idSize == 6 { + rows.TableID = buf.readUint48() + } else { + rows.TableID = uint64(buf.readUint32()) + } + + rows.Flags = buf.readUint16() + + if typ.isEither(WriteRowsEventV2, UpdateRowsEventV2, DeleteRowsEventV2) { + // Extra data length is part of extra data, deduct 2 bytes as they + // already store its length + extraLen := buf.readUint16() - 2 + rows.ExtraData = buf.readStringVarLen(int(extraLen)) + } + + rows.ColumnCount, _ = buf.readUintLenEnc() + rows.ColumnBitmap1 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8) + if typ.isEither(UpdateRowsEventV2, UpdateRowsEventV1) { + rows.ColumnBitmap2 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8) + } + + tm, ok := r.tableMap[rows.TableID] + if !ok { + panic(fmt.Errorf("Out of sync: no table map definition for ID=%d", rows.TableID)) + } + rows.TableMap = &tm + + pretty.Println(typ.String(), rows, tm, buf.cur()) + + rows.decodeRows(buf, rows.ColumnBitmap1) +} + +func (r *Rows) decodeRows(buf *buffer, bm []byte) { + count := 0 + for i := 0; i < int(r.ColumnCount); i++ { + if isBitSet(bm, i) { + count++ + } + } + count = (count + 7) / 8 + + nullBM := buf.readStringVarLen(count) + nullCnt := 0 + row := make([]interface{}, r.ColumnCount) + + pretty.Println(count, nullBM) + + var err error + for i := 0; i < int(r.ColumnCount); i++ { + if !isBitSet(bm, i) { + continue + } + + isNull := (uint32(nullBM[nullCnt/8]) >> uint32(nullCnt%8)) & 0x01 + nullCnt++ + if isNull > 0 { + row[i] = nil + continue + } + + row[i], err = r.decodeValue(buf, columnType(r.TableMap.ColumnTypes[i]), r.TableMap.ColumnMeta[i]) + + if err != nil { + panic(err) + } + } +} + +func (r *Rows) decodeValue(buf *buffer, ct columnType, meta uint16) (interface{}, error) { + switch ct { + case colTypeDecimal: + pretty.Println("Type", ct.String()) + case colTypeTiny: + pretty.Println("Type", ct.String()) + case colTypeShort: + pretty.Println("Type", ct.String()) + case colTypeLong: + pretty.Println("Type", ct.String()) + case colTypeFloat: + pretty.Println("Type", ct.String()) + case colTypeDouble: + pretty.Println("Type", ct.String()) + case colTypeNull: + pretty.Println("Type", ct.String()) + case colTypeTimestamp: + pretty.Println("Type", ct.String()) + case colTypeLonglong: + pretty.Println("Type", ct.String()) + case colTypeInt24: + pretty.Println("Type", ct.String()) + case colTypeDate: + pretty.Println("Type", ct.String()) + case colTypeTime: + pretty.Println("Type", ct.String()) + case colTypeDatetime: + pretty.Println("Type", ct.String()) + case colTypeYear: + pretty.Println("Type", ct.String()) + case colTypeVarchar: + pretty.Println("Type", ct.String()) + case colTypeBit: + pretty.Println("Type", ct.String()) + + case colTypeJSON: + pretty.Println("Type", ct.String()) + case colTypeNewDecimal: + pretty.Println("Type", ct.String()) + case colTypeEnum: + pretty.Println("Type", ct.String()) + case colTypeSet: + pretty.Println("Type", ct.String()) + case colTypeTinyblob: + pretty.Println("Type", ct.String()) + case colTypeMediumblob: + pretty.Println("Type", ct.String()) + case colTypeLongblob: + pretty.Println("Type", ct.String()) + case colTypeBlob: + pretty.Println("Type", ct.String()) + case colTypeVarstring: + pretty.Println("Type", ct.String()) + case colTypeString: + pretty.Println("Type", ct.String()) + case colTypeGeometry: + pretty.Println("Type", ct.String()) + } + return nil, nil +} + +func isBitSet(bm []byte, i int) bool { + return bm[i>>3]&(1<<(uint(i)&7)) > 0 +} diff --git a/event_table_map.go b/parser/event_table_map.go similarity index 99% rename from event_table_map.go rename to parser/event_table_map.go index 709beff..2367f39 100644 --- a/event_table_map.go +++ b/parser/event_table_map.go @@ -1,4 +1,4 @@ -package blt +package parser // TableMap ... type TableMap struct { diff --git a/event_types.go b/parser/event_types.go similarity index 99% rename from event_types.go rename to parser/event_types.go index 6c1fc86..92596d7 100644 --- a/event_types.go +++ b/parser/event_types.go @@ -1,4 +1,4 @@ -package blt +package parser import ( "fmt" diff --git a/reader.go b/parser/reader.go similarity index 99% rename from reader.go rename to parser/reader.go index 907d6aa..29aa27c 100644 --- a/reader.go +++ b/parser/reader.go @@ -1,4 +1,4 @@ -package blt +package parser import ( "context"