From f3961d403c160b36e16008ce08b3a0010fdcb5c6 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Sun, 29 Jul 2018 19:20:30 +0200 Subject: [PATCH] Initial commit --- LICENCE | 18 +++ README.md | 21 ++++ buffer.go | 115 +++++++++++++++++++ cmd/main.go | 59 ++++++++++ column_types.go | 112 +++++++++++++++++++ encoding.go | 211 +++++++++++++++++++++++++++++++++++ encoding_test.go | 9 ++ event_format_description.go | 115 +++++++++++++++++++ event_header.go | 92 ++++++++++++++++ event_rotate.go | 13 +++ event_rows.go | 64 +++++++++++ event_table_map.go | 59 ++++++++++ event_types.go | 212 ++++++++++++++++++++++++++++++++++++ reader.go | 197 +++++++++++++++++++++++++++++++++ 14 files changed, 1297 insertions(+) create mode 100644 LICENCE create mode 100644 README.md create mode 100644 buffer.go create mode 100644 cmd/main.go create mode 100644 column_types.go create mode 100644 encoding.go create mode 100644 encoding_test.go create mode 100644 event_format_description.go create mode 100644 event_header.go create mode 100644 event_rotate.go create mode 100644 event_rows.go create mode 100644 event_table_map.go create mode 100644 event_types.go create mode 100644 reader.go diff --git a/LICENCE b/LICENCE new file mode 100644 index 0000000..ba7a548 --- /dev/null +++ b/LICENCE @@ -0,0 +1,18 @@ +Copyright 2018 Gregory Eremin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..ca04dad --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +# BLT + +MySQL binary log parser. + +### WIP + +Work in progress, some events are not fully supported. + +*TODO:* + +[x] FormatDescriptionEvent +[x] TableMapEvent +[x] RotateEvent +[ ] RowsEvent +[ ] XIDEvent +[ ] GTIDEvent +[ ] QueryEvent + +### Licence + +MIT \ No newline at end of file diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..c9a4b5f --- /dev/null +++ b/buffer.go @@ -0,0 +1,115 @@ +package blt + +import ( + "encoding/binary" +) + +// buffer is a simple wrapper over a slice of bytes with a cursor. It allows for +// easy command building and results parsing. +type buffer struct { + data []byte + pos int +} + +// skip next n bytes +func (b *buffer) skip(n int) { + b.pos += n +} + +// advance skips next N bytes and returns them +func (b *buffer) advance(n int) []byte { + b.skip(n) + return b.data[b.pos-n:] +} + +// cur returns remaining unread buffer. +func (b *buffer) cur() []byte { + return b.data[b.pos:] +} + +// newReadBuffer creates a buffer with command output. +func newReadBuffer(data []byte) *buffer { + return &buffer{data: data} +} + +func (b *buffer) readUint8() uint8 { + return decodeUint8(b.advance(1)) +} + +func (b *buffer) readUint16() uint16 { + return decodeUint16(b.advance(2)) +} + +func (b *buffer) readUint24() uint32 { + return decodeUint24(b.advance(3)) +} + +func (b *buffer) readUint32() uint32 { + return decodeUint32(b.advance(4)) +} + +func (b *buffer) readUint48() uint64 { + return decodeUint48(b.advance(6)) +} + +func (b *buffer) readUint64() uint64 { + return decodeUint64(b.advance(8)) +} + +func (b *buffer) readUintLenEnc() (val uint64, isNull bool) { + var size int + val, isNull, size = decodeUintLenEnc(b.cur()) + b.skip(size) + return +} + +func (b *buffer) readStringNullTerm() []byte { + str := decodeStringNullTerm(b.cur()) + b.skip(len(str) + 1) + return str +} + +func (b *buffer) readStringVarLen(n int) []byte { + return decodeStringVarLen(b.advance(n), n) +} + +func (b *buffer) readStringLenEnc() []byte { + str, size := decodeStringLenEnc(b.cur()) + b.skip(size) + return str +} + +func (b *buffer) readStringEOF() []byte { + return decodeStringEOF(b.cur()) +} + +// Pre-allocate command buffer. First four bytes would be used to set command +// length and sequence number. +func newCommandBuffer(size int) *buffer { + return &buffer{data: make([]byte, size+4), pos: 4} +} + +func (b *buffer) writeByte(v byte) { + b.data[b.pos] = v + b.pos++ +} + +func (b *buffer) writeUint16(v uint16) { + binary.LittleEndian.PutUint16(b.data[b.pos:], v) + b.pos += 2 +} + +func (b *buffer) writeUint32(v uint32) { + binary.LittleEndian.PutUint32(b.data[b.pos:], v) + b.pos += 4 +} + +func (b *buffer) writeString(s string) { + b.data[b.pos] = byte(len(s)) + b.pos++ + b.pos += copy(b.data[b.pos:], s) +} + +func (b *buffer) writeStringEOF(s string) { + b.pos += copy(b.data[b.pos:], s) +} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..18ddc1e --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,59 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "time" + + "github.com/localhots/blt" + "github.com/localhots/gobelt/log" +) + +func main() { + dsn := flag.String("dsn", "", "Database source name") + id := flag.Uint("id", 1000, "Server ID (arbitrary, unique)") + file := flag.String("file", "", "Binary log file name") + offset := flag.Uint("offset", 0, "Log offset in bytes") + flag.Parse() + + ctx := context.Background() + validate((*dsn != ""), "Database source name is not set") + validate((*id != 0), "Server ID is not set") + validate((*file != ""), "Binary log file is not set") + conf := blt.Config{ + ServerID: uint32(*id), + File: *file, + Offset: uint32(*offset), + } + + reader, err := blt.Connect(*dsn, conf) + if err != nil { + log.Fatalf(ctx, "Failed to establish connection: %v", err) + } + + off := conf.Offset + for i := 0; i < 100; i++ { + // for { + evt, err := reader.ReadEventHeader(ctx) + if err != nil { + log.Fatalf(ctx, "Failed to read event: %v", err) + } + ts := time.Unix(int64(evt.Timestamp), 0).Format(time.RFC3339) + log.Info(ctx, "Event received", log.F{ + "type": evt.Type, + "timestamp": ts, + "offset": off, + }) + off = evt.NextOffset + } +} + +func validate(cond bool, msg string) { + if !cond { + fmt.Println(msg) + flag.Usage() + os.Exit(2) + } +} diff --git a/column_types.go b/column_types.go new file mode 100644 index 0000000..e2409cc --- /dev/null +++ b/column_types.go @@ -0,0 +1,112 @@ +package blt + +import ( + "fmt" +) + +type columnType byte + +// Spec: https://dev.mysql.com/doc/internals/en/com-query-response.html#column-type +const ( + colTypeDecimal columnType = 0x00 + colTypeTiny columnType = 0x01 + colTypeShort columnType = 0x02 + colTypeLong columnType = 0x03 + colTypeFloat columnType = 0x04 + colTypeDouble columnType = 0x05 + colTypeNull columnType = 0x06 + colTypeTimestamp columnType = 0x07 + colTypeLonglong columnType = 0x08 + colTypeInt24 columnType = 0x09 + colTypeDate columnType = 0x0a + colTypeTime columnType = 0x0b + colTypeDatetime columnType = 0x0c + colTypeYear columnType = 0x0d + colTypeNewDate columnType = 0x0e // Internal + colTypeVarchar columnType = 0x0f + colTypeBit columnType = 0x10 + colTypeTimestamp2 columnType = 0x11 // Internal + colTypeDatetime2 columnType = 0x12 // Internal + colTypeTime2 columnType = 0x13 // Internal + + colTypeJSON columnType = 0xF5 + colTypeNewDecimal columnType = 0xF6 + colTypeEnum columnType = 0xF7 + colTypeSet columnType = 0xF8 + colTypeTinyblob columnType = 0xF9 + colTypeMediumblob columnType = 0xFA + colTypeLongblob columnType = 0xFB + colTypeBlob columnType = 0xFC + colTypeVarstring columnType = 0xFD + colTypeString columnType = 0xFE + colTypeGeometry columnType = 0xFF +) + +func (ct columnType) String() string { + switch ct { + case colTypeDecimal: + return "Decimal" + case colTypeTiny: + return "Tiny" + case colTypeShort: + return "Short" + case colTypeLong: + return "Long" + case colTypeFloat: + return "Float" + case colTypeDouble: + return "Double" + case colTypeNull: + return "Null" + case colTypeTimestamp: + return "Timestamp" + case colTypeLonglong: + return "Longlong" + case colTypeInt24: + return "Int24" + case colTypeDate: + return "Date" + case colTypeTime: + return "Time" + case colTypeDatetime: + return "Datetime" + case colTypeYear: + return "Year" + case colTypeNewDate: + return "NewDate" + case colTypeVarchar: + return "Varchar" + case colTypeBit: + return "Bit" + case colTypeTimestamp2: + return "Timestamp2" + case colTypeDatetime2: + return "Datetime2" + case colTypeTime2: + return "Time2" + case colTypeJSON: + return "JSON" + case colTypeNewDecimal: + return "NewDecimal" + case colTypeEnum: + return "Enum" + case colTypeSet: + return "Set" + case colTypeTinyblob: + return "Tinyblob" + case colTypeMediumblob: + return "Mediumblob" + case colTypeLongblob: + return "Longblob" + case colTypeBlob: + return "Blob" + case colTypeVarstring: + return "Varstring" + case colTypeString: + return "String" + case colTypeGeometry: + return "Geometry" + default: + return fmt.Sprintf("Unknown(%d)", ct) + } +} diff --git a/encoding.go b/encoding.go new file mode 100644 index 0000000..aaede18 --- /dev/null +++ b/encoding.go @@ -0,0 +1,211 @@ +package blt + +import ( + "encoding/binary" + "fmt" +) + +// Protocol::FixedLengthInteger +// A fixed-length integer stores its value in a series of bytes with the least +// significant byte first (little endian). +// Spec: https://dev.mysql.com/doc/internals/en/integer.html#fixed-length-integer + +// int<1> + +func encodeUint8(data []byte, v uint8) { + data[0] = v +} + +func decodeUint8(data []byte) uint8 { + return uint8(data[0]) +} + +// int<2> + +func encodeUint16(data []byte, v uint16) { + binary.LittleEndian.PutUint16(data, v) +} + +func decodeUint16(data []byte) uint16 { + return binary.LittleEndian.Uint16(data) +} + +// int<3> + +func encodeUint24(data []byte, v uint32) { + encodeVarLen64(data, uint64(v), 3) +} + +func decodeUint24(data []byte) uint32 { + return uint32(decodeVarLen64(data, 3)) +} + +// int<4> + +func encodeUint32(data []byte, v uint32) { + binary.LittleEndian.PutUint32(data, v) +} + +func decodeUint32(data []byte) uint32 { + return binary.LittleEndian.Uint32(data) +} + +// int<6> + +func encodeUint48(data []byte, v uint64) { + encodeVarLen64(data, v, 6) +} + +func decodeUint48(data []byte) uint64 { + return decodeVarLen64(data, 6) +} + +// int<8> + +func encodeUint64(data []byte, v uint64) { + binary.LittleEndian.PutUint64(data, v) +} + +func decodeUint64(data []byte) uint64 { + return binary.LittleEndian.Uint64(data) +} + +// Protocol::LengthEncodedInteger +// An integer that consumes 1, 3, 4, or 9 bytes, depending on its numeric value. +// Spec: https://dev.mysql.com/doc/internals/en/integer.html#length-encoded-integer + +// To convert a number value into a length-encoded integer: +// If the value is < 251, it is stored as a 1-byte integer. +// If the value is ≥ 251 and < (2^16), it is stored as 0xFC + 2-byte integer. +// If the value is ≥ (2^16) and < (2^24), it is stored as 0xFD + 3-byte integer. +// If the value is ≥ (2^24) and < (2^64) it is stored as 0xFE + 8-byte integer. +// Note: up to MySQL 3.22, 0xFE was followed by a 4-byte integer. +func encodeUintLenEnc(data []byte, v uint64, isNull bool) (size int) { + switch { + case isNull: + data[0] = 0xFB + return 1 + case v <= 0xFB: + data[0] = byte(v) + return 1 + case v <= 2<<15: + data[0] = 0xFC + encodeVarLen64(data[1:], v, 2) + return 3 + case v <= 2<<23: + data[0] = 0xFD + encodeVarLen64(data[1:], v, 3) + return 4 + default: + data[0] = 0xFE + encodeVarLen64(data[1:], v, 8) + return 9 + } +} + +// To convert a length-encoded integer into its numeric value, check the first +// byte: +// If it is < 0xFB, treat it as a 1-byte integer. +// If it is 0xFC, it is followed by a 2-byte integer. +// If it is 0xFD, it is followed by a 3-byte integer. +// If it is 0xFE, it is followed by a 8-byte integer. +// Depending on the context, the first byte may also have other meanings: +// If it is 0xFB, it is represents a NULL in a ProtocolText::ResultsetRow. +// If it is 0xFF and is the first byte of an ERR_Packet +// Caution: +// If the first byte of a packet is a length-encoded integer and its byte value +// is 0xFE, you must check the length of the packet to verify that it has enough +// space for a 8-byte integer. +// If not, it may be an EOF_Packet instead. +func decodeUintLenEnc(data []byte) (v uint64, isNull bool, size int) { + switch data[0] { + case 0xFB: + return 0xFB, true, 1 + case 0xFC: + return decodeVarLen64(data[1:], 2), false, 3 + case 0xFD: + return decodeVarLen64(data[1:], 3), false, 4 + case 0xFE: + return decodeVarLen64(data[1:], 8), false, 9 + default: + return uint64(data[0]), false, 1 + } +} + +// +// Variable length encoding helpers +// + +func encodeVarLen64(data []byte, v uint64, s int) { + for i := 0; i < s; i++ { + data[i] = byte(v >> uint(i*8)) + } +} + +func decodeVarLen64(data []byte, s int) uint64 { + v := uint64(data[0]) + for i := 1; i < s; i++ { + v |= uint64(data[i]) << uint(i*8) + } + return v +} + +// Protocol::NulTerminatedString +// Strings that are terminated by a 0x00 byte. +func decodeStringNullTerm(data []byte) []byte { + for i, c := range data { + if c == 0x00 { + s := make([]byte, i+1) + copy(s, data[:i]) + return s + } + } + + s := make([]byte, len(data)) + copy(s, data) + return s +} + +// Protocol::VariableLengthString +// The length of the string is determined by another field or is calculated at +// runtime. +// Protocol::FixedLengthString +// Fixed-length strings have a known, hardcoded length. +func encodeStringVarLen(data, str []byte) { + copy(data, str) +} + +func decodeStringVarLen(data []byte, n int) []byte { + return decodeStringEOF(data[:n]) +} + +// Protocol::LengthEncodedString +// A length encoded string is a string that is prefixed with length encoded +// integer describing the length of the string. +// It is a special case of Protocol::VariableLengthString +func decodeStringLenEnc(data []byte) (str []byte, size int) { + strlen, _, size := decodeUintLenEnc(data) + strleni := int(strlen) + s := make([]byte, strleni) + copy(s, data[size:size+strleni]) + return s, size + strleni +} + +// Protocol::RestOfPacketString +// If a string is the last component of a packet, its length can be calculated +// from the overall packet length minus the current position. +func decodeStringEOF(data []byte) []byte { + s := make([]byte, len(data)) + copy(s, data) + return s +} + +func trimString(str []byte) string { + fmt.Println(str, string(str)) + for i, c := range str { + if c == 0x00 { + return string(str[:i]) + } + } + return string(str) +} diff --git a/encoding_test.go b/encoding_test.go new file mode 100644 index 0000000..29c6885 --- /dev/null +++ b/encoding_test.go @@ -0,0 +1,9 @@ +package blt + +import "testing" + +func TestEncodeUint8(t *testing.T) { + buf := make([]byte, 1) + encodeUint8(buf, 123) + t.Log(buf) +} diff --git a/event_format_description.go b/event_format_description.go new file mode 100644 index 0000000..f735d69 --- /dev/null +++ b/event_format_description.go @@ -0,0 +1,115 @@ +package blt + +import ( + "fmt" + "strconv" + "strings" + + "github.com/localhots/pretty" +) + +// FormatDescription is a description of binary log format. +type FormatDescription struct { + Version uint16 + ServerVersion string + CreateTimestamp uint32 + EventHeaderLength uint8 + EventTypeHeaderLengths []uint8 + ServerDetails ServerDetails +} + +// ServerDetails contains server feature details. +type ServerDetails struct { + Flavor Flavor + Version int + ChecksumAlgorithm ChecksumAlgorithm +} + +// Flavor defines the specific kind of MySQL-like database. +type Flavor string + +// ChecksumAlgorithm is a checksum algorithm is the one used by the server. +type ChecksumAlgorithm byte + +const ( + // FlavorMySQL is the MySQL db flavor. + FlavorMySQL = "MySQL" +) + +const ( + // ChecksumAlgorithmNone means no checksum appened. + ChecksumAlgorithmNone ChecksumAlgorithm = 0x00 + // ChecksumAlgorithmCRC32 used to append a 4 byte checksum at the end. + ChecksumAlgorithmCRC32 ChecksumAlgorithm = 0x01 + // ChecksumAlgorithmUndefined is used when checksum algorithm is not known. + ChecksumAlgorithmUndefined ChecksumAlgorithm = 0xFF +) + +// Spec: https://dev.mysql.com/doc/internals/en/format-description-event.html +func decodeFormatDescription(data []byte) FormatDescription { + buf := newReadBuffer(data) + fd := FormatDescription{ + Version: buf.readUint16(), + ServerVersion: string(trimString(buf.readStringVarLen(50))), + CreateTimestamp: buf.readUint32(), + EventHeaderLength: buf.readUint8(), + EventTypeHeaderLengths: buf.readStringEOF(), + } + pretty.Println(fd) + fd.ServerDetails = ServerDetails{ + Flavor: FlavorMySQL, + Version: parseVersionNumber(fd.ServerVersion), + ChecksumAlgorithm: ChecksumAlgorithmUndefined, + } + if fd.ServerDetails.Version > 50601 { + // Last 5 bytes are: + // [1] Checksum algorithm + // [4] Checksum + fd.ServerDetails.ChecksumAlgorithm = ChecksumAlgorithm(data[len(data)-5]) + fd.EventTypeHeaderLengths = fd.EventTypeHeaderLengths[:len(fd.EventTypeHeaderLengths)-5] + } + + return fd +} + +func (fd FormatDescription) tableIDSize(et EventType) int { + if fd.headerLen(et) == 6 { + return 4 + } + return 6 +} + +func (fd FormatDescription) headerLen(et EventType) int { + return int(fd.EventTypeHeaderLengths[et-1]) +} + +func (ca ChecksumAlgorithm) String() string { + switch ca { + case ChecksumAlgorithmNone: + return "None" + case ChecksumAlgorithmCRC32: + return "CRC32" + case ChecksumAlgorithmUndefined: + return "Undefined" + default: + return fmt.Sprintf("Unknown(%d)", ca) + } +} + +// parseVersionNumber turns string version into a number just like the library +// mysql_get_server_version function does. +// Example: 5.7.19-log gets represented as 50719 +// Spec: https://dev.mysql.com/doc/refman/8.0/en/mysql-get-server-version.html +func parseVersionNumber(v string) int { + tokens := strings.Split(v, ".") + major, _ := strconv.Atoi(tokens[0]) + minor, _ := strconv.Atoi(tokens[1]) + var patch int + for i, c := range tokens[2] { + if c < '0' || c > '9' { + patch, _ = strconv.Atoi(tokens[2][:i]) + break + } + } + return major*10000 + minor*100 + patch +} diff --git a/event_header.go b/event_header.go new file mode 100644 index 0000000..f4ccc00 --- /dev/null +++ b/event_header.go @@ -0,0 +1,92 @@ +package blt + +import ( + "errors" + + "github.com/localhots/pretty" +) + +var ( + // ErrInvalidHeader is returned when event header cannot be parsed. + ErrInvalidHeader = errors.New("Header is invalid") +) + +// EventHeader represents binlog event header. +type EventHeader struct { + Timestamp uint32 + Type EventType + ServerID uint32 + EventLen uint32 + NextOffset uint32 + Flags uint16 + ExtraHeaders []byte + + eventBody []byte +} + +// Spec: https://dev.mysql.com/doc/internals/en/event-header-fields.html +func (r *Reader) parseHeader(data []byte) (*EventHeader, error) { + headerLen := r.headerLen() + if len(data) < headerLen { + return nil, ErrInvalidHeader + } + // pretty.Println(headerLen, data) + + buf := newReadBuffer(data) + h := &EventHeader{ + Timestamp: buf.readUint32(), + Type: EventType(buf.readUint8()), + ServerID: buf.readUint32(), + EventLen: buf.readUint32(), + } + if r.format.Version == 0 || r.format.Version >= 3 { + h.NextOffset = buf.readUint32() + h.Flags = buf.readUint16() + } + if r.format.Version >= 4 { + h.ExtraHeaders = buf.readStringVarLen(headerLen - 19) + } + h.eventBody = buf.cur() + + if h.NextOffset > 0 { + r.state.Offset = uint64(h.NextOffset) + } + + csa := r.format.ServerDetails.ChecksumAlgorithm + if h.Type != FormatDescriptionEvent && csa == ChecksumAlgorithmCRC32 { + h.eventBody = h.eventBody[:len(h.eventBody)-4] + } + + // pretty.Println(h) + + switch h.Type { + case FormatDescriptionEvent: + r.format = decodeFormatDescription(h.eventBody) + pretty.Println(h.Type.String(), r.format) + case RotateEvent: + r.state = r.decodeRotateEvent(h.eventBody) + pretty.Println(h.Type.String(), r.state) + case TableMapEvent: + tm := r.decodeTableMap(h.eventBody) + r.tableMap[tm.TableID] = tm + // pretty.Println(h.Type.String(), tm) + case WriteRowsEventV0, WriteRowsEventV1, WriteRowsEventV2, + UpdateRowsEventV0, UpdateRowsEventV1, UpdateRowsEventV2, + DeleteRowsEventV0, DeleteRowsEventV1, DeleteRowsEventV2: + r.decodeRowsEvent(h.eventBody, h.Type) + case XIDEvent, GTIDEvent: + // TODO: Add support for these too + case QueryEvent: + // TODO: Handle schema changes + } + + return h, nil +} + +func (r *Reader) headerLen() int { + const defaultHeaderLength = 19 + if r.format.EventHeaderLength > 0 { + return int(r.format.EventHeaderLength) + } + return defaultHeaderLength +} diff --git a/event_rotate.go b/event_rotate.go new file mode 100644 index 0000000..cf331f0 --- /dev/null +++ b/event_rotate.go @@ -0,0 +1,13 @@ +package blt + +func (r *Reader) decodeRotateEvent(data []byte) Position { + buf := newReadBuffer(data) + var p Position + if r.format.Version > 1 { + p.Offset = buf.readUint64() + } else { + p.Offset = 4 + } + p.File = string(buf.readStringEOF()) + return p +} diff --git a/event_rows.go b/event_rows.go new file mode 100644 index 0000000..7e8078d --- /dev/null +++ b/event_rows.go @@ -0,0 +1,64 @@ +package blt + +import ( + "fmt" + + "github.com/localhots/pretty" +) + +// Rows contains a Rows Event. +type Rows struct { + EventType EventType + TableID uint64 + Flags uint16 + ExtraData []byte + ColumnCount uint64 + ColumnBitmap1 []byte + ColumnBitmap2 []byte + Rows [][]interface{} +} + +type rowsFlag uint16 + +const ( + rowsFlagEndOfStatement rowsFlag = 0x0001 + rowsFlagNoForeignKeyChecks rowsFlag = 0x0002 + rowsFlagNoUniqueKeyChecks rowsFlag = 0x0004 + rowsFlagRowHasColumns rowsFlag = 0x0008 + + freeTableMapID = 0x00FFFFFF +) + +func (r *Reader) decodeRowsEvent(data []byte, typ EventType) { + // pretty.Println(data) + buf := newReadBuffer(data) + rows := Rows{EventType: typ} + idSize := r.format.tableIDSize(typ) + if idSize == 6 { + rows.TableID = buf.readUint48() + } else { + rows.TableID = uint64(buf.readUint32()) + } + + rows.Flags = buf.readUint16() + + if typ.isEither(WriteRowsEventV2, UpdateRowsEventV2, DeleteRowsEventV2) { + // Extra data length is part of extra data, deduct 2 bytes as they + // already store its length + extraLen := buf.readUint16() - 2 + rows.ExtraData = buf.readStringVarLen(int(extraLen)) + } + + rows.ColumnCount, _ = buf.readUintLenEnc() + rows.ColumnBitmap1 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8) + if typ.isEither(UpdateRowsEventV2, UpdateRowsEventV1) { + rows.ColumnBitmap2 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8) + } + + tm, ok := r.tableMap[rows.TableID] + if !ok { + panic(fmt.Errorf("Out of sync: no table map definition for ID=%d", rows.TableID)) + } + + pretty.Println(typ.String(), rows, tm, buf.cur()) +} diff --git a/event_table_map.go b/event_table_map.go new file mode 100644 index 0000000..709beff --- /dev/null +++ b/event_table_map.go @@ -0,0 +1,59 @@ +package blt + +// TableMap ... +type TableMap struct { + TableID uint64 + Flags uint16 + SchemaName string + TableName string + ColumnCount uint64 + ColumnTypes []byte + ColumnMeta []uint16 + NullBitmask []byte +} + +// Spec: https://dev.mysql.com/doc/internals/en/table-map-event.html +func (r *Reader) decodeTableMap(data []byte) TableMap { + buf := newReadBuffer(data) + var tm TableMap + idSize := r.format.tableIDSize(TableMapEvent) + if idSize == 6 { + tm.TableID = buf.readUint48() + } else { + tm.TableID = uint64(buf.readUint32()) + } + + tm.Flags = buf.readUint16() + tm.SchemaName = string(buf.readStringLenEnc()) + buf.skip(1) // Always 0x00 + tm.TableName = string(buf.readStringLenEnc()) + buf.skip(1) // Always 0x00 + tm.ColumnCount, _ = buf.readUintLenEnc() + tm.ColumnTypes = buf.readStringVarLen(int(tm.ColumnCount)) + tm.ColumnMeta = decodeColumnMeta(buf.readStringLenEnc(), tm.ColumnTypes) + tm.NullBitmask = buf.readStringVarLen(int(tm.ColumnCount+8) / 7) + + return tm +} + +func decodeColumnMeta(data []byte, cols []byte) []uint16 { + pos := 0 + meta := make([]uint16, len(cols)) + for i, typ := range cols { + switch columnType(typ) { + case colTypeString, colTypeNewDecimal: + // TODO: Is that correct? + meta[i] = uint16(data[pos])<<8 | uint16(data[pos+1]) + pos += 2 + case colTypeVarchar, colTypeVarstring, colTypeBit: + // TODO: Is that correct? + meta[i] = decodeUint16(data[pos:]) + pos += 2 + case colTypeFloat, colTypeDouble, colTypeBlob, colTypeGeometry, colTypeJSON, + colTypeTime2, colTypeDatetime2, colTypeTimestamp2: + meta[i] = uint16(data[pos]) + pos++ + } + } + return meta +} diff --git a/event_types.go b/event_types.go new file mode 100644 index 0000000..6c1fc86 --- /dev/null +++ b/event_types.go @@ -0,0 +1,212 @@ +package blt + +import ( + "fmt" +) + +// EventType defines a binary log event type. +type EventType byte + +// Spec: https://dev.mysql.com/doc/internals/en/event-classes-and-types.html +const ( + // UnknownEvent is an event that should never occur. + UnknownEvent EventType = 0 + // StartEventV3 is the Start_event of binlog format 3. + StartEventV3 EventType = 1 + // QueryEvent is created for each query that modifies the database, unless + // the query is logged row-based. + QueryEvent EventType = 2 + // StopEvent is written to the log files under these circumstances: + // A master writes the event to the binary log when it shuts down. + // A slave writes the event to the relay log when it shuts down or when a + // RESET SLAVE statement is executed. + StopEvent EventType = 3 + // RotateEvent is written at the end of the file that points to the next + // file in the squence. It is written when a binary log file exceeds a size + // limit. + RotateEvent EventType = 4 + // IntvarEvent will be created just before a Query_event, if the query uses + // one of the variables LAST_INSERT_ID or INSERT_ID. + IntvarEvent EventType = 5 + // LoadEvent ... + LoadEvent EventType = 6 + // SlaveEvent ... + SlaveEvent EventType = 7 + // CreateFileEvent ... + CreateFileEvent EventType = 8 + // AppendBlockEvent is created to contain the file data. + AppendBlockEvent EventType = 9 + // ExecLoadEvent ... + ExecLoadEvent EventType = 10 + // DeleteFileEvent occurs when the LOAD DATA failed on the master. + // This event notifies the slave not to do the load and to delete the + // temporary file. + DeleteFileEvent EventType = 11 + // NewLoadEvent ... + NewLoadEvent EventType = 12 + // RandEvent logs random seed used by the next RAND(), and by PASSWORD() + // in 4.1.0. + RandEvent EventType = 13 + // UserVarEvent is written every time a statement uses a user variable; + // precedes other events for the statement. Indicates the value to use for + // the user variable in the next statement. This is written only before a + // QUERY_EVENT and is not used with row-based logging. + UserVarEvent EventType = 14 + // FormatDescriptionEvent is saved by threads which read it, as they need it + // for future use (to decode the ordinary events). + FormatDescriptionEvent EventType = 15 + // XIDEvent is generated for a commit of a transaction that modifies one or + // more tables of an XA-capable storage engine. + XIDEvent EventType = 16 + // BeginLoadQueryEvent is for the first block of file to be loaded, its only + // difference from Append_block event is that this event creates or + // truncates existing file before writing data. + BeginLoadQueryEvent EventType = 17 + // ExecuteLoadQueryEvent is responsible for LOAD DATA execution, it similar + // to Query_event but before executing the query it substitutes original + // filename in LOAD DATA query with name of temporary file. + ExecuteLoadQueryEvent EventType = 18 + // TableMapEvent is used in row-based mode where it preceeds every row + // operation event and maps a table definition to a number. The table + // definition consists of database name, table name, and column definitions. + TableMapEvent EventType = 19 + // WriteRowsEventV0 represents inserted rows. Used in MySQL 5.1.0 to 5.1.15. + WriteRowsEventV0 EventType = 20 + // UpdateRowsEventV0 represents updated rows. It contains both old and new + // versions. Used in MySQL 5.1.0 to 5.1.15. + UpdateRowsEventV0 EventType = 21 + // DeleteRowsEventV0 represents deleted rows. Used in MySQL 5.1.0 to 5.1.15. + DeleteRowsEventV0 EventType = 22 + // WriteRowsEventV1 represents inserted rows. Used in MySQL 5.1.15 to 5.6. + WriteRowsEventV1 EventType = 23 + // UpdateRowsEventV1 represents updated rows. It contains both old and new + // versions. Used in MySQL 5.1.15 to 5.6. + UpdateRowsEventV1 EventType = 24 + // DeleteRowsEventV1 represents deleted rows. Used in MySQL 5.1.15 to 5.6. + DeleteRowsEventV1 EventType = 25 + // IncidentEvent represents an incident, an occurance out of the ordinary, + // that happened on the master. The event is used to inform the slave that + // something out of the ordinary happened on the master that might cause the + // database to be in an inconsistent state. + IncidentEvent EventType = 26 + // HeartbeetEvent is a replication event used to ensure to slave that master + // is alive. The event is originated by master's dump thread and sent + // straight to slave without being logged. Slave itself does not store it in + // relay log but rather uses a data for immediate checks and throws away the + // event. + HeartbeetEvent EventType = 27 + // IgnorableEvent is a kind of event that could be ignored. + IgnorableEvent EventType = 28 + // RowsQueryEvent is a subclass of the IgnorableEvent, to record the + // original query for the rows events in RBR. + RowsQueryEvent EventType = 29 + // WriteRowsEventV2 represents inserted rows. Used starting from MySQL 5.6. + WriteRowsEventV2 EventType = 30 + // UpdateRowsEventV2 represents updated rows. It contains both old and new + // versions. Used starting from MySQL 5.6. + UpdateRowsEventV2 EventType = 31 + // DeleteRowsEventV2 represents deleted rows. Used starting from MySQL 5.6. + DeleteRowsEventV2 EventType = 32 + // GTIDEvent is an event that contains latest GTID. + // GTID stands for Global Transaction IDentifier It is composed of two + // parts: + // * SID for Source Identifier, and + // * GNO for Group Number. The basic idea is to associate an identifier, the + // Global Transaction IDentifier or GTID, to every transaction. When a + // transaction is copied to a slave, re-executed on the slave, and written + // to the slave's binary log, the GTID is preserved. When a slave connects + // to a master, the slave uses GTIDs instead of (file, offset). + GTIDEvent EventType = 33 + // AnonymousGTIDEvent is a subclass of GTIDEvent. + AnonymousGTIDEvent EventType = 34 + // PreviousGTIDsEvent is a subclass of GTIDEvent. + PreviousGTIDsEvent EventType = 35 +) + +func (et EventType) isEither(types ...EventType) bool { + for _, t := range types { + if et == t { + return true + } + } + return false +} + +func (et EventType) String() string { + switch et { + case UnknownEvent: + return "UnknownEvent" + case StartEventV3: + return "StartEventV3" + case QueryEvent: + return "QueryEvent" + case StopEvent: + return "StopEvent" + case RotateEvent: + return "RotateEvent" + case IntvarEvent: + return "IntvarEvent" + case LoadEvent: + return "LoadEvent" + case SlaveEvent: + return "SlaveEvent" + case CreateFileEvent: + return "CreateFileEvent" + case AppendBlockEvent: + return "AppendBlockEvent" + case ExecLoadEvent: + return "ExecLoadEvent" + case DeleteFileEvent: + return "DeleteFileEvent" + case NewLoadEvent: + return "NewLoadEvent" + case RandEvent: + return "RandEvent" + case UserVarEvent: + return "UserVarEvent" + case FormatDescriptionEvent: + return "FormatDescriptionEvent" + case XIDEvent: + return "XIDEvent" + case BeginLoadQueryEvent: + return "BeginLoadQueryEvent" + case ExecuteLoadQueryEvent: + return "ExecuteLoadQueryEvent" + case TableMapEvent: + return "TableMapEvent" + case WriteRowsEventV0: + return "WriteRowsEventV0" + case UpdateRowsEventV0: + return "UpdateRowsEventV0" + case DeleteRowsEventV0: + return "DeleteRowsEventV0" + case WriteRowsEventV1: + return "WriteRowsEventV1" + case UpdateRowsEventV1: + return "UpdateRowsEventV1" + case DeleteRowsEventV1: + return "DeleteRowsEventV1" + case IncidentEvent: + return "IncidentEvent" + case HeartbeetEvent: + return "HeartbeetEvent" + case IgnorableEvent: + return "IgnorableEvent" + case RowsQueryEvent: + return "RowsQueryEvent" + case WriteRowsEventV2: + return "WriteRowsEventV2" + case UpdateRowsEventV2: + return "UpdateRowsEventV2" + case DeleteRowsEventV2: + return "DeleteRowsEventV2" + case GTIDEvent: + return "GTIDEvent" + case AnonymousGTIDEvent: + return "AnonymousGTIDEvent" + case PreviousGTIDsEvent: + return "PreviousGTIDsEvent" + default: + return fmt.Sprintf("Unknown(%d)", et) + } +} diff --git a/reader.go b/reader.go new file mode 100644 index 0000000..907d6aa --- /dev/null +++ b/reader.go @@ -0,0 +1,197 @@ +package blt + +import ( + "context" + "database/sql/driver" + "fmt" + "io" + "os" + + "github.com/juju/errors" + "github.com/localhots/gobelt/log" + "github.com/localhots/mysql" +) + +// Reader ... +type Reader struct { + conn *mysql.ExtendedConn + conf Config + state Position + format FormatDescription + tableMap map[uint64]TableMap +} + +// Config ... +type Config struct { + ServerID uint32 + File string + Offset uint32 + Hostname string +} + +// Position ... +type Position struct { + File string + Offset uint64 +} + +const ( + // Bytes + resultOK byte = 0x00 + resultEOF byte = 0xFE + resultERR byte = 0xFF +) + +// NewReader ... +func NewReader(conn driver.Conn, conf Config) (*Reader, error) { + if conf.Hostname == "" { + name, err := os.Hostname() + if err != nil { + return nil, err + } + conf.Hostname = name + } + + extconn, err := mysql.ExtendConn(conn) + if err != nil { + return nil, err + } + r := &Reader{ + conn: extconn, + conf: conf, + tableMap: make(map[uint64]TableMap), + } + + if err := r.disableChecksum(); err != nil { + return nil, errors.Annotate(err, "Failed to disable binlog checksum") + } + if err := r.registerSlave(); err != nil { + return nil, errors.Annotate(err, "Failed to register slave server") + } + if err := r.binlogDump(); err != nil { + return nil, errors.Annotate(err, "Failed to start binlog dump") + } + + return r, nil +} + +// Connect ... +func Connect(dsn string, conf Config) (*Reader, error) { + conn, err := (&mysql.MySQLDriver{}).Open(dsn) + if err != nil { + return nil, err + } + return NewReader(conn, conf) +} + +// ReadEventHeader reads next event from the log and decodes its header. Header +// is then used to decode the event. +func (r *Reader) ReadEventHeader(ctx context.Context) (*EventHeader, error) { + data, err := r.conn.ReadPacket() + if err != nil { + return nil, err + } + switch data[0] { + case resultOK: + return r.parseHeader(data[1:]) + case resultERR: + return nil, r.conn.HandleErrorPacket(data) + case resultEOF: + log.Debug(ctx, "EOF received") + return nil, nil + default: + log.Errorf(ctx, "Unexpected header: %x", data[0]) + return nil, nil + } +} + +// Spec: https://dev.mysql.com/doc/internals/en/com-register-slave.html +func (r *Reader) registerSlave() error { + const comRegisterSlave byte = 21 + r.conn.ResetSequence() + + buf := newCommandBuffer(1 + 4 + 1 + len(r.conf.Hostname) + 1 + 1 + 2 + 4 + 4) + buf.writeByte(comRegisterSlave) + buf.writeUint32(r.conf.ServerID) + buf.writeString(r.conf.Hostname) + // The rest of the payload would be zeroes, consider following code for + // reference: + // + // buf.writeString(username) + // buf.writeString(password) + // buf.writeUint16(port) + // buf.writeUint32(replicationRank) + // buf.writeUint32(masterID) + + return r.runCmd(buf) +} + +// Spec: https://dev.mysql.com/doc/internals/en/com-binlog-dump.html +// TODO: https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html +func (r *Reader) binlogDump() error { + const comBinlogDump byte = 18 + r.conn.ResetSequence() + + r.state.File = r.conf.File + r.state.Offset = uint64(r.conf.Offset) + // First event offset is 4 + if r.state.Offset < 4 { + r.state.Offset = 4 + } + + buf := newCommandBuffer(1 + 4 + 2 + 4 + len(r.state.File)) + buf.writeByte(comBinlogDump) + buf.writeUint32(uint32(r.state.Offset)) + buf.skip(2) // Flags + buf.writeUint32(r.conf.ServerID) + buf.writeStringEOF(r.state.File) + + return r.runCmd(buf) +} + +func (r *Reader) runCmd(buf *buffer) error { + err := r.conn.WritePacket(buf.data) + if err != nil { + return err + } + return r.conn.ReadResultOK() +} + +func (r *Reader) disableChecksum() error { + cs, err := r.getVar("BINLOG_CHECKSUM") + if err != nil { + return err + } + + if cs != "NONE" { + return r.setVar("@master_binlog_checksum", "NONE") + } + return nil +} + +func (r *Reader) getVar(name string) (string, error) { + rows, err := r.conn.Query(fmt.Sprintf("SHOW VARIABLES LIKE %q", name), []driver.Value{}) + if err != nil { + return "", notEOF(err) + } + defer rows.Close() + + res := make([]driver.Value, len(rows.Columns())) + err = rows.Next(res) + if err != nil { + return "", notEOF(err) + } + + return string(res[1].([]byte)), nil +} + +func (r *Reader) setVar(name, val string) error { + return r.conn.Exec(fmt.Sprintf("SET %s=%q", name, val)) +} + +func notEOF(err error) error { + if err == io.EOF { + return nil + } + return err +}