From 5ef06424991b78a81f1717fe5c56725ef2132c84 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Tue, 6 Nov 2018 22:51:56 +0100 Subject: [PATCH] Major refactoring The thing still works somehow --- .../event_format_description.go | 73 ++++-- binlog/event_header.go | 48 ++++ binlog/event_rotate.go | 28 +++ binlog/event_rows.go | 203 +++++++++++++++++ binlog/event_table_map.go | 85 +++++++ binlog/event_types.go | 209 +++++++++++++++++ cmd/main.go | 19 +- mysql/column_types.go | 113 ++++++++++ {parser => mysql}/encoding.go | 89 +++++--- parser/buffer.go | 115 ---------- parser/column_types.go | 112 --------- parser/encoding_test.go | 9 - parser/event_header.go | 92 -------- parser/event_rotate.go | 13 -- parser/event_rows.go | 169 -------------- parser/event_table_map.go | 59 ----- parser/event_types.go | 212 ------------------ parser/reader.go | 197 ---------------- reader/reader.go | 117 ++++++++++ reader/slave_conn.go | 172 ++++++++++++++ tools/buffer.go | 144 ++++++++++++ 21 files changed, 1236 insertions(+), 1042 deletions(-) rename {parser => binlog}/event_format_description.go (59%) create mode 100644 binlog/event_header.go create mode 100644 binlog/event_rotate.go create mode 100644 binlog/event_rows.go create mode 100644 binlog/event_table_map.go create mode 100644 binlog/event_types.go create mode 100644 mysql/column_types.go rename {parser => mysql}/encoding.go (62%) delete mode 100644 parser/buffer.go delete mode 100644 parser/column_types.go delete mode 100644 parser/encoding_test.go delete mode 100644 parser/event_header.go delete mode 100644 parser/event_rotate.go delete mode 100644 parser/event_rows.go delete mode 100644 parser/event_table_map.go delete mode 100644 parser/event_types.go delete mode 100644 parser/reader.go create mode 100644 reader/reader.go create mode 100644 reader/slave_conn.go create mode 100644 tools/buffer.go diff --git a/parser/event_format_description.go b/binlog/event_format_description.go similarity index 59% rename from parser/event_format_description.go rename to binlog/event_format_description.go index 0ac42e4..71331e1 100644 --- a/parser/event_format_description.go +++ b/binlog/event_format_description.go @@ -1,9 +1,11 @@ -package parser +package binlog import ( "fmt" "strconv" "strings" + + "github.com/localhots/blt/tools" ) // FormatDescription is a description of binary log format. @@ -23,6 +25,12 @@ type ServerDetails struct { ChecksumAlgorithm ChecksumAlgorithm } +// FormatDescriptionEvent contains server details and binary log format +// description. It is usually the first event in a log file. +type FormatDescriptionEvent struct { + FormatDescription +} + // Flavor defines the specific kind of MySQL-like database. type Flavor string @@ -32,9 +40,7 @@ type ChecksumAlgorithm byte const ( // FlavorMySQL is the MySQL db flavor. FlavorMySQL = "MySQL" -) -const ( // ChecksumAlgorithmNone means no checksum appened. ChecksumAlgorithmNone ChecksumAlgorithm = 0x00 // ChecksumAlgorithmCRC32 used to append a 4 byte checksum at the end. @@ -43,43 +49,53 @@ const ( ChecksumAlgorithmUndefined ChecksumAlgorithm = 0xFF ) +// Decode decodes given buffer into a format description event. // Spec: https://dev.mysql.com/doc/internals/en/format-description-event.html -func decodeFormatDescription(data []byte) FormatDescription { - buf := newReadBuffer(data) - fd := FormatDescription{ - Version: buf.readUint16(), - ServerVersion: string(trimString(buf.readStringVarLen(50))), - CreateTimestamp: buf.readUint32(), - EventHeaderLength: buf.readUint8(), - EventTypeHeaderLengths: buf.readStringEOF(), - } - fd.ServerDetails = ServerDetails{ +func (e *FormatDescriptionEvent) Decode(data []byte) error { + buf := tools.NewBuffer(data) + e.Version = buf.ReadUint16() + e.ServerVersion = trimString(buf.ReadStringVarLen(50)) + e.CreateTimestamp = buf.ReadUint32() + e.EventHeaderLength = buf.ReadUint8() + e.EventTypeHeaderLengths = buf.ReadStringEOF() + e.ServerDetails = ServerDetails{ Flavor: FlavorMySQL, - Version: parseVersionNumber(fd.ServerVersion), + Version: parseVersionNumber(e.ServerVersion), ChecksumAlgorithm: ChecksumAlgorithmUndefined, } - if fd.ServerDetails.Version > 50601 { + if e.ServerDetails.Version > 50601 { // Last 5 bytes are: // [1] Checksum algorithm // [4] Checksum - fd.ServerDetails.ChecksumAlgorithm = ChecksumAlgorithm(data[len(data)-5]) - fd.EventTypeHeaderLengths = fd.EventTypeHeaderLengths[:len(fd.EventTypeHeaderLengths)-5] + e.ServerDetails.ChecksumAlgorithm = ChecksumAlgorithm(data[len(data)-5]) + e.EventTypeHeaderLengths = e.EventTypeHeaderLengths[:len(e.EventTypeHeaderLengths)-5] } - return fd + return nil } -func (fd FormatDescription) tableIDSize(et EventType) int { - if fd.headerLen(et) == 6 { +// HeaderLen returns length of event header. +func (fd FormatDescription) HeaderLen() int { + const defaultHeaderLength = 19 + if fd.EventHeaderLength > 0 { + return int(fd.EventHeaderLength) + } + return defaultHeaderLength +} + +// PostHeaderLen returns length of a post-header for a given event type. +func (fd FormatDescription) PostHeaderLen(et EventType) int { + return int(fd.EventTypeHeaderLengths[et-1]) +} + +// TableIDSize returns table ID size for a given event type. +func (fd FormatDescription) TableIDSize(et EventType) int { + if fd.PostHeaderLen(et) == 6 { return 4 } return 6 } -func (fd FormatDescription) headerLen(et EventType) int { - return int(fd.EventTypeHeaderLengths[et-1]) -} - func (ca ChecksumAlgorithm) String() string { switch ca { case ChecksumAlgorithmNone: @@ -110,3 +126,12 @@ func parseVersionNumber(v string) int { } return major*10000 + minor*100 + patch } + +func trimString(str []byte) string { + for i, c := range str { + if c == 0x00 { + return string(str[:i]) + } + } + return string(str) +} diff --git a/binlog/event_header.go b/binlog/event_header.go new file mode 100644 index 0000000..4e9bb95 --- /dev/null +++ b/binlog/event_header.go @@ -0,0 +1,48 @@ +package binlog + +import ( + "errors" + + "github.com/localhots/blt/tools" +) + +var ( + // ErrInvalidHeader is returned when event header cannot be parsed. + ErrInvalidHeader = errors.New("Header is invalid") +) + +// EventHeader represents binlog event header. +type EventHeader struct { + Timestamp uint32 + Type EventType + ServerID uint32 + EventLen uint32 + NextOffset uint32 + Flags uint16 + ExtraHeaders []byte +} + +// Decode decodes given buffer into event header. +// Spec: https://dev.mysql.com/doc/internals/en/event-header-fields.html +func (h *EventHeader) Decode(connBuff []byte, fd FormatDescription) error { + headerLen := fd.HeaderLen() + if len(connBuff) < headerLen { + return ErrInvalidHeader + } + + buf := tools.NewBuffer(connBuff) + h.Timestamp = buf.ReadUint32() + h.Type = EventType(buf.ReadUint8()) + h.ServerID = buf.ReadUint32() + h.EventLen = buf.ReadUint32() + + if fd.Version == 0 || fd.Version >= 3 { + h.NextOffset = buf.ReadUint32() + h.Flags = buf.ReadUint16() + } + if fd.Version >= 4 { + h.ExtraHeaders = buf.ReadStringVarLen(headerLen - 19) + } + + return nil +} diff --git a/binlog/event_rotate.go b/binlog/event_rotate.go new file mode 100644 index 0000000..2b2e389 --- /dev/null +++ b/binlog/event_rotate.go @@ -0,0 +1,28 @@ +package binlog + +import "github.com/localhots/blt/tools" + +// Position ... +type Position struct { + File string + Offset uint64 +} + +// RotateEvent is written at the end of the file that points to the next file in +// the squence. It is written when a binary log file exceeds a size limit. +type RotateEvent struct { + NextFile Position +} + +// Decode decodes given buffer into a rotate event. +// Spec: https://dev.mysql.com/doc/internals/en/rotate-event.html +func (e *RotateEvent) Decode(connBuff []byte, fd FormatDescription) error { + buf := tools.NewBuffer(connBuff) + if fd.Version > 1 { + e.NextFile.Offset = buf.ReadUint64() + } else { + e.NextFile.Offset = 4 + } + e.NextFile.File = string(buf.ReadStringEOF()) + return nil +} diff --git a/binlog/event_rows.go b/binlog/event_rows.go new file mode 100644 index 0000000..153c97e --- /dev/null +++ b/binlog/event_rows.go @@ -0,0 +1,203 @@ +package binlog + +import ( + "github.com/localhots/blt/mysql" + "github.com/localhots/blt/tools" + "github.com/localhots/pretty" +) + +// RowsEvent contains a Rows Event. +type RowsEvent struct { + Type EventType + TableID uint64 + Flags uint16 + ExtraData []byte + ColumnCount uint64 + ColumnBitmap1 []byte + ColumnBitmap2 []byte + Rows [][]interface{} +} + +type rowsFlag uint16 + +const ( + rowsFlagEndOfStatement rowsFlag = 0x0001 + rowsFlagNoForeignKeyChecks rowsFlag = 0x0002 + rowsFlagNoUniqueKeyChecks rowsFlag = 0x0004 + rowsFlagRowHasColumns rowsFlag = 0x0008 + + freeTableMapID = 0x00FFFFFF +) + +// PeekTableID returns table ID without decoding whole event. +func (e *RowsEvent) PeekTableID(connBuff []byte, fd FormatDescription) uint64 { + if fd.TableIDSize(e.Type) == 6 { + return mysql.DecodeUint48(connBuff) + } + return uint64(mysql.DecodeUint32(connBuff)) +} + +// Decode decodes given buffer into a rows event event. +func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescription) error { + // pretty.Println(data) + buf := tools.NewBuffer(connBuff) + idSize := fd.TableIDSize(e.Type) + if idSize == 6 { + e.TableID = buf.ReadUint48() + } else { + e.TableID = uint64(buf.ReadUint32()) + } + + e.Flags = buf.ReadUint16() + + if RowsEventHasExtraData(e.Type) { + // Extra data length is part of extra data, deduct 2 bytes as they + // already store its length + extraLen := buf.ReadUint16() - 2 + e.ExtraData = buf.ReadStringVarLen(int(extraLen)) + } + + e.ColumnCount, _, _ = buf.ReadUintLenEnc() + e.ColumnBitmap1 = buf.ReadStringVarLen(int(e.ColumnCount+7) / 8) + if RowsEventHasSecondBitmap(e.Type) { + e.ColumnBitmap2 = buf.ReadStringVarLen(int(e.ColumnCount+7) / 8) + } + + pretty.Println(e.Type.String(), e, td, buf.Cur()) + + e.decodeRows(buf, td, e.ColumnBitmap1) + return nil +} + +func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte) { + count := 0 + for i := 0; i < int(e.ColumnCount); i++ { + if isBitSet(bm, i) { + count++ + } + } + count = (count + 7) / 8 + + nullBM := buf.ReadStringVarLen(count) + nullCnt := 0 + row := make([]interface{}, e.ColumnCount) + + pretty.Println(count, nullBM) + + var err error + for i := 0; i < int(e.ColumnCount); i++ { + if !isBitSet(bm, i) { + continue + } + + isNull := (uint32(nullBM[nullCnt/8]) >> uint32(nullCnt%8)) & 0x01 + nullCnt++ + if isNull > 0 { + row[i] = nil + continue + } + + row[i], err = e.decodeValue(buf, mysql.ColumnType(td.ColumnTypes[i]), td.ColumnMeta[i]) + + if err != nil { + panic(err) + } + } +} + +func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uint16) (interface{}, error) { + switch ct { + case mysql.ColumnTypeDecimal: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeTiny: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeShort: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeLong: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeFloat: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeDouble: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeNull: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeTimestamp: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeLonglong: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeInt24: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeDate: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeTime: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeDatetime: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeYear: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeVarchar: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeBit: + pretty.Println("Type", ct.String()) + + case mysql.ColumnTypeJSON: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeNewDecimal: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeEnum: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeSet: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeTinyblob: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeMediumblob: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeLongblob: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeBlob: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeVarstring: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeString: + pretty.Println("Type", ct.String()) + case mysql.ColumnTypeGeometry: + pretty.Println("Type", ct.String()) + } + return nil, nil +} + +func isBitSet(bm []byte, i int) bool { + return bm[i>>3]&(1<<(uint(i)&7)) > 0 +} + +// RowsEventVersion returns rows event versions. If event is not a rows type -1 +// is returned. +func RowsEventVersion(et EventType) int { + switch et { + case EventTypeWriteRowsV0, EventTypeUpdateRowsV0, EventTypeDeleteRowsV0: + return 0 + case EventTypeWriteRowsV1, EventTypeUpdateRowsV1, EventTypeDeleteRowsV1: + return 1 + case EventTypeWriteRowsV2, EventTypeUpdateRowsV2, EventTypeDeleteRowsV2: + return 2 + default: + return -1 + } +} + +// RowsEventHasExtraData returns true if given event is of rows type and +// contains extra data. +func RowsEventHasExtraData(et EventType) bool { + return RowsEventVersion(et) == 2 +} + +// RowsEventHasSecondBitmap returns true if given event is of rows type and +// contains a second bitmap. +func RowsEventHasSecondBitmap(et EventType) bool { + switch et { + case EventTypeUpdateRowsV1, EventTypeUpdateRowsV2: + return true + default: + return false + } +} diff --git a/binlog/event_table_map.go b/binlog/event_table_map.go new file mode 100644 index 0000000..81c1446 --- /dev/null +++ b/binlog/event_table_map.go @@ -0,0 +1,85 @@ +package binlog + +import ( + "github.com/localhots/blt/mysql" + "github.com/localhots/blt/tools" +) + +// TableDescription contains table details required to process rows events. +type TableDescription struct { + Flags uint16 + SchemaName string + TableName string + ColumnCount uint64 + ColumnTypes []byte + ColumnMeta []uint16 + NullBitmask []byte +} + +// TableMapEvent contains table description alongside an ID that would be used +// to reference the table in the following rows events. +type TableMapEvent struct { + TableID uint64 + TableDescription +} + +// Decode decodes given buffer into a table map event. +// Spec: https://dev.mysql.com/doc/internals/en/table-map-event.html +func (e *TableMapEvent) Decode(connBuff []byte, fd FormatDescription) error { + buf := tools.NewBuffer(connBuff) + idSize := fd.TableIDSize(EventTypeTableMap) + if idSize == 6 { + e.TableID = buf.ReadUint48() + } else { + e.TableID = uint64(buf.ReadUint32()) + } + + e.Flags = buf.ReadUint16() + schemaName, _ := buf.ReadStringLenEnc() + e.SchemaName = string(schemaName) + buf.Skip(1) // Always 0x00 + tableName, _ := buf.ReadStringLenEnc() + e.TableName = string(tableName) + buf.Skip(1) // Always 0x00 + e.ColumnCount, _, _ = buf.ReadUintLenEnc() + e.ColumnTypes = buf.ReadStringVarLen(int(e.ColumnCount)) + colMeta, _ := buf.ReadStringLenEnc() + e.ColumnMeta = decodeColumnMeta(colMeta, e.ColumnTypes) + e.NullBitmask = buf.ReadStringVarLen(int(e.ColumnCount+8) / 7) + + return nil +} + +func decodeColumnMeta(data []byte, cols []byte) []uint16 { + pos := 0 + meta := make([]uint16, len(cols)) + for i, typ := range cols { + switch mysql.ColumnType(typ) { + case mysql.ColumnTypeString, + mysql.ColumnTypeNewDecimal: + + // TODO: Is that correct? + meta[i] = uint16(data[pos])<<8 | uint16(data[pos+1]) + pos += 2 + case mysql.ColumnTypeVarchar, + mysql.ColumnTypeVarstring, + mysql.ColumnTypeBit: + + // TODO: Is that correct? + meta[i] = mysql.DecodeUint16(data[pos:]) + pos += 2 + case mysql.ColumnTypeFloat, + mysql.ColumnTypeDouble, + mysql.ColumnTypeBlob, + mysql.ColumnTypeGeometry, + mysql.ColumnTypeJSON, + mysql.ColumnTypeTime2, + mysql.ColumnTypeDatetime2, + mysql.ColumnTypeTimestamp2: + + meta[i] = uint16(data[pos]) + pos++ + } + } + return meta +} diff --git a/binlog/event_types.go b/binlog/event_types.go new file mode 100644 index 0000000..c94fcf4 --- /dev/null +++ b/binlog/event_types.go @@ -0,0 +1,209 @@ +package binlog + +import ( + "fmt" +) + +// EventType defines a binary log event type. +type EventType byte + +// Spec: https://dev.mysql.com/doc/internals/en/event-classes-and-types.html +const ( + // EventTypeUnknown is an event that should never occur. + EventTypeUnknown EventType = 0 + // EventTypeStartV3 is the Start_event of binlog format 3. + EventTypeStartV3 EventType = 1 + // EventTypeQuery is created for each query that modifies the database, + // unless the query is logged row-based. + EventTypeQuery EventType = 2 + // EventTypeStop is written to the log files under these circumstances: + // A master writes the event to the binary log when it shuts down. + // A slave writes the event to the relay log when it shuts down or when a + // RESET SLAVE statement is executed. + EventTypeStop EventType = 3 + // EventTypeRotate is written at the end of the file that points to the next + // file in the squence. It is written when a binary log file exceeds a size + // limit. + EventTypeRotate EventType = 4 + // EventTypeIntvar will be created just before a Query_event, if the query + // uses one of the variables LAST_INSERT_ID or INSERT_ID. + EventTypeIntvar EventType = 5 + // EventTypeLoad ... + EventTypeLoad EventType = 6 + // EventTypeSlave ... + EventTypeSlave EventType = 7 + // EventTypeCreateFile ... + EventTypeCreateFile EventType = 8 + // EventTypeAppendBlock is created to contain the file data. + EventTypeAppendBlock EventType = 9 + // EventTypeExecLoad ... + EventTypeExecLoad EventType = 10 + // EventTypeDeleteFile occurs when the LOAD DATA failed on the master. + // This event notifies the slave not to do the load and to delete the + // temporary file. + EventTypeDeleteFile EventType = 11 + // EventTypeNewLoad ... + EventTypeNewLoad EventType = 12 + // EventTypeRand logs random seed used by the next RAND(), and by PASSWORD() + // in 4.1.0. + EventTypeRand EventType = 13 + // EventTypeUserVar is written every time a statement uses a user variable; + // precedes other events for the statement. Indicates the value to use for + // the user variable in the next statement. This is written only before a + // QUERY_EVENT and is not used with row-based logging. + EventTypeUserVar EventType = 14 + // EventTypeFormatDescription is saved by threads which read it, as they + // need it for future use (to decode the ordinary events). + EventTypeFormatDescription EventType = 15 + // EventTypeXID is generated for a commit of a transaction that modifies one + // or more tables of an XA-capable storage engine. + EventTypeXID EventType = 16 + // EventTypeBeginLoadQuery is for the first block of file to be loaded, its + // only difference from Append_block event is that this event creates or + // truncates existing file before writing data. + EventTypeBeginLoadQuery EventType = 17 + // EventTypeExecuteLoadQuery is responsible for LOAD DATA execution, it + // similar to Query_event but before executing the query it substitutes + // original filename in LOAD DATA query with name of temporary file. + EventTypeExecuteLoadQuery EventType = 18 + // EventTypeTableMap is used in row-based mode where it preceeds every row + // operation event and maps a table definition to a number. The table + // definition consists of database name, table name, and column definitions. + EventTypeTableMap EventType = 19 + // EventTypeWriteRowsV0 represents inserted rows. Used in MySQL 5.1.0 to + // 5.1.15. + EventTypeWriteRowsV0 EventType = 20 + // EventTypeUpdateRowsV0 represents updated rows. It contains both old and + // new versions. Used in MySQL 5.1.0 to 5.1.15. + EventTypeUpdateRowsV0 EventType = 21 + // EventTypeDeleteRowsV0 represents deleted rows. Used in MySQL 5.1.0 to + // 5.1.15. + EventTypeDeleteRowsV0 EventType = 22 + // EventTypeWriteRowsV1 represents inserted rows. Used in MySQL 5.1.15 to + // 5.6. + EventTypeWriteRowsV1 EventType = 23 + // EventTypeUpdateRowsV1 represents updated rows. It contains both old and + // new versions. Used in MySQL 5.1.15 to 5.6. + EventTypeUpdateRowsV1 EventType = 24 + // EventTypeDeleteRowsV1 represents deleted rows. Used in MySQL 5.1.15 to + // 5.6. + EventTypeDeleteRowsV1 EventType = 25 + // EventTypeIncident represents an incident, an occurance out of the + // ordinary, that happened on the master. The event is used to inform the + // slave that something out of the ordinary happened on the master that + // might cause the database to be in an inconsistent state. + EventTypeIncident EventType = 26 + // EventTypeHeartbeet is a replication event used to ensure to slave that + // master is alive. The event is originated by master's dump thread and sent + // straight to slave without being logged. Slave itself does not store it in + // relay log but rather uses a data for immediate checks and throws away the + // event. + EventTypeHeartbeet EventType = 27 + // EventTypeIgnorable is a kind of event that could be ignored. + EventTypeIgnorable EventType = 28 + // EventTypeRowsQuery is a subclass of the IgnorableEvent, to record the + // original query for the rows events in RBR. + EventTypeRowsQuery EventType = 29 + // EventTypeWriteRowsV2 represents inserted rows. Used starting from MySQL + // 5.6. + EventTypeWriteRowsV2 EventType = 30 + // EventTypeUpdateRowsV2 represents updated rows. It contains both old and + // new versions. Used starting from MySQL 5.6. + EventTypeUpdateRowsV2 EventType = 31 + // EventTypeDeleteRowsV2 represents deleted rows. Used starting from MySQL + // 5.6. + EventTypeDeleteRowsV2 EventType = 32 + // EventTypeGTID is an event that contains latest GTID. + // GTID stands for Global Transaction IDentifier It is composed of two + // parts: + // * SID for Source Identifier, and + // * GNO for Group Number. The basic idea is to associate an identifier, the + // Global Transaction IDentifier or GTID, to every transaction. When a + // transaction is copied to a slave, re-executed on the slave, and written + // to the slave's binary log, the GTID is preserved. When a slave connects + // to a master, the slave uses GTIDs instead of (file, offset). + EventTypeGTID EventType = 33 + // EventTypeAnonymousGTID is a subclass of GTIDEvent. + EventTypeAnonymousGTID EventType = 34 + // EventTypePreviousGTIDs is a subclass of GTIDEvent. + EventTypePreviousGTIDs EventType = 35 +) + +func (et EventType) String() string { + switch et { + case EventTypeUnknown: + return "UnknownEvent" + case EventTypeStartV3: + return "StartEventV3" + case EventTypeQuery: + return "QueryEvent" + case EventTypeStop: + return "StopEvent" + case EventTypeRotate: + return "RotateEvent" + case EventTypeIntvar: + return "IntvarEvent" + case EventTypeLoad: + return "LoadEvent" + case EventTypeSlave: + return "SlaveEvent" + case EventTypeCreateFile: + return "CreateFileEvent" + case EventTypeAppendBlock: + return "AppendBlockEvent" + case EventTypeExecLoad: + return "ExecLoadEvent" + case EventTypeDeleteFile: + return "DeleteFileEvent" + case EventTypeNewLoad: + return "NewLoadEvent" + case EventTypeRand: + return "RandEvent" + case EventTypeUserVar: + return "UserVarEvent" + case EventTypeFormatDescription: + return "FormatDescriptionEvent" + case EventTypeXID: + return "XIDEvent" + case EventTypeBeginLoadQuery: + return "BeginLoadQueryEvent" + case EventTypeExecuteLoadQuery: + return "ExecuteLoadQueryEvent" + case EventTypeTableMap: + return "TableMapEvent" + case EventTypeWriteRowsV0: + return "WriteRowsEventV0" + case EventTypeUpdateRowsV0: + return "UpdateRowsEventV0" + case EventTypeDeleteRowsV0: + return "DeleteRowsEventV0" + case EventTypeWriteRowsV1: + return "WriteRowsEventV1" + case EventTypeUpdateRowsV1: + return "UpdateRowsEventV1" + case EventTypeDeleteRowsV1: + return "DeleteRowsEventV1" + case EventTypeIncident: + return "IncidentEvent" + case EventTypeHeartbeet: + return "HeartbeetEvent" + case EventTypeIgnorable: + return "IgnorableEvent" + case EventTypeRowsQuery: + return "RowsQueryEvent" + case EventTypeWriteRowsV2: + return "WriteRowsEventV2" + case EventTypeUpdateRowsV2: + return "UpdateRowsEventV2" + case EventTypeDeleteRowsV2: + return "DeleteRowsEventV2" + case EventTypeGTID: + return "GTIDEvent" + case EventTypeAnonymousGTID: + return "AnonymousGTIDEvent" + case EventTypePreviousGTIDs: + return "PreviousGTIDsEvent" + default: + return fmt.Sprintf("Unknown(%d)", et) + } +} diff --git a/cmd/main.go b/cmd/main.go index 93c9e84..a387465 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,7 +7,7 @@ import ( "os" "time" - "github.com/localhots/blt/parser" + "github.com/localhots/blt/reader" "github.com/localhots/gobelt/log" ) @@ -22,31 +22,36 @@ func main() { validate((*dsn != ""), "Database source name is not set") validate((*id != 0), "Server ID is not set") validate((*file != ""), "Binary log file is not set") - conf := parser.Config{ + conf := reader.Config{ ServerID: uint32(*id), File: *file, Offset: uint32(*offset), } - reader, err := parser.Connect(*dsn, conf) + conn, err := reader.Connect(*dsn, conf) if err != nil { log.Fatalf(ctx, "Failed to establish connection: %v", err) } + reader, err := reader.NewReader(conn) + if err != nil { + log.Fatalf(ctx, "Failed to create reader: %v", err) + } + off := conf.Offset for i := 0; i < 100; i++ { // for { - evt, err := reader.ReadEventHeader(ctx) + evt, err := reader.ReadEvent() if err != nil { log.Fatalf(ctx, "Failed to read event: %v", err) } - ts := time.Unix(int64(evt.Timestamp), 0).Format(time.RFC3339) + ts := time.Unix(int64(evt.Header.Timestamp), 0).Format(time.RFC3339) log.Info(ctx, "Event received", log.F{ - "type": evt.Type, + "type": evt.Header.Type, "timestamp": ts, "offset": off, }) - off = evt.NextOffset + off = evt.Header.NextOffset } } diff --git a/mysql/column_types.go b/mysql/column_types.go new file mode 100644 index 0000000..2906260 --- /dev/null +++ b/mysql/column_types.go @@ -0,0 +1,113 @@ +package mysql + +import ( + "fmt" +) + +// ColumnType represents MySQL column type. +type ColumnType byte + +// Spec: https://dev.mysql.com/doc/internals/en/com-query-response.html#column-type +const ( + ColumnTypeDecimal ColumnType = 0x00 + ColumnTypeTiny ColumnType = 0x01 + ColumnTypeShort ColumnType = 0x02 + ColumnTypeLong ColumnType = 0x03 + ColumnTypeFloat ColumnType = 0x04 + ColumnTypeDouble ColumnType = 0x05 + ColumnTypeNull ColumnType = 0x06 + ColumnTypeTimestamp ColumnType = 0x07 + ColumnTypeLonglong ColumnType = 0x08 + ColumnTypeInt24 ColumnType = 0x09 + ColumnTypeDate ColumnType = 0x0a + ColumnTypeTime ColumnType = 0x0b + ColumnTypeDatetime ColumnType = 0x0c + ColumnTypeYear ColumnType = 0x0d + ColumnTypeNewDate ColumnType = 0x0e // Internal + ColumnTypeVarchar ColumnType = 0x0f + ColumnTypeBit ColumnType = 0x10 + ColumnTypeTimestamp2 ColumnType = 0x11 // Internal + ColumnTypeDatetime2 ColumnType = 0x12 // Internal + ColumnTypeTime2 ColumnType = 0x13 // Internal + + ColumnTypeJSON ColumnType = 0xF5 + ColumnTypeNewDecimal ColumnType = 0xF6 + ColumnTypeEnum ColumnType = 0xF7 + ColumnTypeSet ColumnType = 0xF8 + ColumnTypeTinyblob ColumnType = 0xF9 + ColumnTypeMediumblob ColumnType = 0xFA + ColumnTypeLongblob ColumnType = 0xFB + ColumnTypeBlob ColumnType = 0xFC + ColumnTypeVarstring ColumnType = 0xFD + ColumnTypeString ColumnType = 0xFE + ColumnTypeGeometry ColumnType = 0xFF +) + +func (ct ColumnType) String() string { + switch ct { + case ColumnTypeDecimal: + return "Decimal" + case ColumnTypeTiny: + return "Tiny" + case ColumnTypeShort: + return "Short" + case ColumnTypeLong: + return "Long" + case ColumnTypeFloat: + return "Float" + case ColumnTypeDouble: + return "Double" + case ColumnTypeNull: + return "Null" + case ColumnTypeTimestamp: + return "Timestamp" + case ColumnTypeLonglong: + return "Longlong" + case ColumnTypeInt24: + return "Int24" + case ColumnTypeDate: + return "Date" + case ColumnTypeTime: + return "Time" + case ColumnTypeDatetime: + return "Datetime" + case ColumnTypeYear: + return "Year" + case ColumnTypeNewDate: + return "NewDate" + case ColumnTypeVarchar: + return "Varchar" + case ColumnTypeBit: + return "Bit" + case ColumnTypeTimestamp2: + return "Timestamp2" + case ColumnTypeDatetime2: + return "Datetime2" + case ColumnTypeTime2: + return "Time2" + case ColumnTypeJSON: + return "JSON" + case ColumnTypeNewDecimal: + return "NewDecimal" + case ColumnTypeEnum: + return "Enum" + case ColumnTypeSet: + return "Set" + case ColumnTypeTinyblob: + return "Tinyblob" + case ColumnTypeMediumblob: + return "Mediumblob" + case ColumnTypeLongblob: + return "Longblob" + case ColumnTypeBlob: + return "Blob" + case ColumnTypeVarstring: + return "Varstring" + case ColumnTypeString: + return "String" + case ColumnTypeGeometry: + return "Geometry" + default: + return fmt.Sprintf("Unknown(%d)", ct) + } +} diff --git a/parser/encoding.go b/mysql/encoding.go similarity index 62% rename from parser/encoding.go rename to mysql/encoding.go index 2523554..db48286 100644 --- a/parser/encoding.go +++ b/mysql/encoding.go @@ -1,8 +1,7 @@ -package parser +package mysql import ( "encoding/binary" - "fmt" ) // Protocol::FixedLengthInteger @@ -12,61 +11,75 @@ import ( // int<1> -func encodeUint8(data []byte, v uint8) { +// EncodeUint8 encodes given uint8 value into a slice of bytes. +func EncodeUint8(data []byte, v uint8) { data[0] = v } -func decodeUint8(data []byte) uint8 { +// DecodeUint8 decodes a uint8 value from a given slice of bytes. +func DecodeUint8(data []byte) uint8 { return uint8(data[0]) } // int<2> -func encodeUint16(data []byte, v uint16) { +// EncodeUint16 encodes given uint16 value into a slice of bytes. +func EncodeUint16(data []byte, v uint16) { binary.LittleEndian.PutUint16(data, v) } -func decodeUint16(data []byte) uint16 { +// DecodeUint16 decodes a uint16 value from a given slice of bytes. +func DecodeUint16(data []byte) uint16 { return binary.LittleEndian.Uint16(data) } // int<3> -func encodeUint24(data []byte, v uint32) { +// EncodeUint24 encodes given uint32 value as a 3-byte integer into a slice of +// bytes. +func EncodeUint24(data []byte, v uint32) { encodeVarLen64(data, uint64(v), 3) } -func decodeUint24(data []byte) uint32 { +// DecodeUint24 decodes 3 bytes as uint32 value from a given slice of bytes. +func DecodeUint24(data []byte) uint32 { return uint32(decodeVarLen64(data, 3)) } // int<4> -func encodeUint32(data []byte, v uint32) { +// EncodeUint32 encodes given uint32 value into a slice of bytes. +func EncodeUint32(data []byte, v uint32) { binary.LittleEndian.PutUint32(data, v) } -func decodeUint32(data []byte) uint32 { +// DecodeUint32 decodes a uint32 value from a given slice of bytes. +func DecodeUint32(data []byte) uint32 { return binary.LittleEndian.Uint32(data) } // int<6> -func encodeUint48(data []byte, v uint64) { +// EncodeUint48 encodes given uint64 value as a 6-byte integer into a slice of +// bytes. +func EncodeUint48(data []byte, v uint64) { encodeVarLen64(data, v, 6) } -func decodeUint48(data []byte) uint64 { +// DecodeUint48 decodes 6 bytes as uint64 value from a given slice of bytes. +func DecodeUint48(data []byte) uint64 { return decodeVarLen64(data, 6) } // int<8> -func encodeUint64(data []byte, v uint64) { +// EncodeUint64 encodes given uint64 value into a slice of bytes. +func EncodeUint64(data []byte, v uint64) { binary.LittleEndian.PutUint64(data, v) } -func decodeUint64(data []byte) uint64 { +// DecodeUint64 decodes a uint64 value from a given slice of bytes. +func DecodeUint64(data []byte) uint64 { return binary.LittleEndian.Uint64(data) } @@ -74,13 +87,16 @@ func decodeUint64(data []byte) uint64 { // An integer that consumes 1, 3, 4, or 9 bytes, depending on its numeric value. // Spec: https://dev.mysql.com/doc/internals/en/integer.html#length-encoded-integer +// EncodeUintLenEnc writes a length-encoded integer into a given slice of bytes +// and returns the length of an encoded value. +// // To convert a number value into a length-encoded integer: // If the value is < 251, it is stored as a 1-byte integer. // If the value is ≥ 251 and < (2^16), it is stored as 0xFC + 2-byte integer. // If the value is ≥ (2^16) and < (2^24), it is stored as 0xFD + 3-byte integer. // If the value is ≥ (2^24) and < (2^64) it is stored as 0xFE + 8-byte integer. // Note: up to MySQL 3.22, 0xFE was followed by a 4-byte integer. -func encodeUintLenEnc(data []byte, v uint64, isNull bool) (size int) { +func EncodeUintLenEnc(data []byte, v uint64, isNull bool) (size int) { switch { case isNull: data[0] = 0xFB @@ -103,6 +119,8 @@ func encodeUintLenEnc(data []byte, v uint64, isNull bool) (size int) { } } +// DecodeUintLenEnc decodes a length-encoded integer from a given slice of bytes. +// // To convert a length-encoded integer into its numeric value, check the first // byte: // If it is < 0xFB, treat it as a 1-byte integer. @@ -117,7 +135,7 @@ func encodeUintLenEnc(data []byte, v uint64, isNull bool) (size int) { // is 0xFE, you must check the length of the packet to verify that it has enough // space for a 8-byte integer. // If not, it may be an EOF_Packet instead. -func decodeUintLenEnc(data []byte) (v uint64, isNull bool, size int) { +func DecodeUintLenEnc(data []byte) (v uint64, isNull bool, size int) { switch data[0] { case 0xFB: return 0xFB, true, 1 @@ -152,7 +170,11 @@ func decodeVarLen64(data []byte, s int) uint64 { // Protocol::NulTerminatedString // Strings that are terminated by a 0x00 byte. -func decodeStringNullTerm(data []byte) []byte { +// Spec: https://dev.mysql.com/doc/internals/en/string.html + +// DecodeStringNullTerm decodes a null terminated string from a given slice of +// bytes. +func DecodeStringNullTerm(data []byte) []byte { for i, c := range data { if c == 0x00 { s := make([]byte, i+1) @@ -169,22 +191,31 @@ func decodeStringNullTerm(data []byte) []byte { // Protocol::VariableLengthString // The length of the string is determined by another field or is calculated at // runtime. + // Protocol::FixedLengthString // Fixed-length strings have a known, hardcoded length. -func encodeStringVarLen(data, str []byte) { + +// EncodeStringVarLen encodes a variable-length string into a given slice of +// bytes. +func EncodeStringVarLen(data, str []byte) { copy(data, str) } -func decodeStringVarLen(data []byte, n int) []byte { - return decodeStringEOF(data[:n]) +// DecodeStringVarLen decodes a varible-length string from a given slice of +// bytes. +func DecodeStringVarLen(data []byte, n int) []byte { + return DecodeStringEOF(data[:n]) } // Protocol::LengthEncodedString // A length encoded string is a string that is prefixed with length encoded // integer describing the length of the string. // It is a special case of Protocol::VariableLengthString -func decodeStringLenEnc(data []byte) (str []byte, size int) { - strlen, _, size := decodeUintLenEnc(data) + +// DecodeStringLenEnc decodes a length-encoded string from a given slice of +// bytes. +func DecodeStringLenEnc(data []byte) (str []byte, size int) { + strlen, _, size := DecodeUintLenEnc(data) strleni := int(strlen) s := make([]byte, strleni) copy(s, data[size:size+strleni]) @@ -194,18 +225,10 @@ func decodeStringLenEnc(data []byte) (str []byte, size int) { // Protocol::RestOfPacketString // If a string is the last component of a packet, its length can be calculated // from the overall packet length minus the current position. -func decodeStringEOF(data []byte) []byte { + +// DecodeStringEOF copies given slice of bytes as a new string. +func DecodeStringEOF(data []byte) []byte { s := make([]byte, len(data)) copy(s, data) return s } - -func trimString(str []byte) string { - fmt.Println(str, string(str)) - for i, c := range str { - if c == 0x00 { - return string(str[:i]) - } - } - return string(str) -} diff --git a/parser/buffer.go b/parser/buffer.go deleted file mode 100644 index 0319963..0000000 --- a/parser/buffer.go +++ /dev/null @@ -1,115 +0,0 @@ -package parser - -import ( - "encoding/binary" -) - -// buffer is a simple wrapper over a slice of bytes with a cursor. It allows for -// easy command building and results parsing. -type buffer struct { - data []byte - pos int -} - -// skip next n bytes -func (b *buffer) skip(n int) { - b.pos += n -} - -// advance skips next N bytes and returns them -func (b *buffer) advance(n int) []byte { - b.skip(n) - return b.data[b.pos-n:] -} - -// cur returns remaining unread buffer. -func (b *buffer) cur() []byte { - return b.data[b.pos:] -} - -// newReadBuffer creates a buffer with command output. -func newReadBuffer(data []byte) *buffer { - return &buffer{data: data} -} - -func (b *buffer) readUint8() uint8 { - return decodeUint8(b.advance(1)) -} - -func (b *buffer) readUint16() uint16 { - return decodeUint16(b.advance(2)) -} - -func (b *buffer) readUint24() uint32 { - return decodeUint24(b.advance(3)) -} - -func (b *buffer) readUint32() uint32 { - return decodeUint32(b.advance(4)) -} - -func (b *buffer) readUint48() uint64 { - return decodeUint48(b.advance(6)) -} - -func (b *buffer) readUint64() uint64 { - return decodeUint64(b.advance(8)) -} - -func (b *buffer) readUintLenEnc() (val uint64, isNull bool) { - var size int - val, isNull, size = decodeUintLenEnc(b.cur()) - b.skip(size) - return -} - -func (b *buffer) readStringNullTerm() []byte { - str := decodeStringNullTerm(b.cur()) - b.skip(len(str) + 1) - return str -} - -func (b *buffer) readStringVarLen(n int) []byte { - return decodeStringVarLen(b.advance(n), n) -} - -func (b *buffer) readStringLenEnc() []byte { - str, size := decodeStringLenEnc(b.cur()) - b.skip(size) - return str -} - -func (b *buffer) readStringEOF() []byte { - return decodeStringEOF(b.cur()) -} - -// Pre-allocate command buffer. First four bytes would be used to set command -// length and sequence number. -func newCommandBuffer(size int) *buffer { - return &buffer{data: make([]byte, size+4), pos: 4} -} - -func (b *buffer) writeByte(v byte) { - b.data[b.pos] = v - b.pos++ -} - -func (b *buffer) writeUint16(v uint16) { - binary.LittleEndian.PutUint16(b.data[b.pos:], v) - b.pos += 2 -} - -func (b *buffer) writeUint32(v uint32) { - binary.LittleEndian.PutUint32(b.data[b.pos:], v) - b.pos += 4 -} - -func (b *buffer) writeString(s string) { - b.data[b.pos] = byte(len(s)) - b.pos++ - b.pos += copy(b.data[b.pos:], s) -} - -func (b *buffer) writeStringEOF(s string) { - b.pos += copy(b.data[b.pos:], s) -} diff --git a/parser/column_types.go b/parser/column_types.go deleted file mode 100644 index 1e84ce4..0000000 --- a/parser/column_types.go +++ /dev/null @@ -1,112 +0,0 @@ -package parser - -import ( - "fmt" -) - -type columnType byte - -// Spec: https://dev.mysql.com/doc/internals/en/com-query-response.html#column-type -const ( - colTypeDecimal columnType = 0x00 - colTypeTiny columnType = 0x01 - colTypeShort columnType = 0x02 - colTypeLong columnType = 0x03 - colTypeFloat columnType = 0x04 - colTypeDouble columnType = 0x05 - colTypeNull columnType = 0x06 - colTypeTimestamp columnType = 0x07 - colTypeLonglong columnType = 0x08 - colTypeInt24 columnType = 0x09 - colTypeDate columnType = 0x0a - colTypeTime columnType = 0x0b - colTypeDatetime columnType = 0x0c - colTypeYear columnType = 0x0d - colTypeNewDate columnType = 0x0e // Internal - colTypeVarchar columnType = 0x0f - colTypeBit columnType = 0x10 - colTypeTimestamp2 columnType = 0x11 // Internal - colTypeDatetime2 columnType = 0x12 // Internal - colTypeTime2 columnType = 0x13 // Internal - - colTypeJSON columnType = 0xF5 - colTypeNewDecimal columnType = 0xF6 - colTypeEnum columnType = 0xF7 - colTypeSet columnType = 0xF8 - colTypeTinyblob columnType = 0xF9 - colTypeMediumblob columnType = 0xFA - colTypeLongblob columnType = 0xFB - colTypeBlob columnType = 0xFC - colTypeVarstring columnType = 0xFD - colTypeString columnType = 0xFE - colTypeGeometry columnType = 0xFF -) - -func (ct columnType) String() string { - switch ct { - case colTypeDecimal: - return "Decimal" - case colTypeTiny: - return "Tiny" - case colTypeShort: - return "Short" - case colTypeLong: - return "Long" - case colTypeFloat: - return "Float" - case colTypeDouble: - return "Double" - case colTypeNull: - return "Null" - case colTypeTimestamp: - return "Timestamp" - case colTypeLonglong: - return "Longlong" - case colTypeInt24: - return "Int24" - case colTypeDate: - return "Date" - case colTypeTime: - return "Time" - case colTypeDatetime: - return "Datetime" - case colTypeYear: - return "Year" - case colTypeNewDate: - return "NewDate" - case colTypeVarchar: - return "Varchar" - case colTypeBit: - return "Bit" - case colTypeTimestamp2: - return "Timestamp2" - case colTypeDatetime2: - return "Datetime2" - case colTypeTime2: - return "Time2" - case colTypeJSON: - return "JSON" - case colTypeNewDecimal: - return "NewDecimal" - case colTypeEnum: - return "Enum" - case colTypeSet: - return "Set" - case colTypeTinyblob: - return "Tinyblob" - case colTypeMediumblob: - return "Mediumblob" - case colTypeLongblob: - return "Longblob" - case colTypeBlob: - return "Blob" - case colTypeVarstring: - return "Varstring" - case colTypeString: - return "String" - case colTypeGeometry: - return "Geometry" - default: - return fmt.Sprintf("Unknown(%d)", ct) - } -} diff --git a/parser/encoding_test.go b/parser/encoding_test.go deleted file mode 100644 index 18ce7be..0000000 --- a/parser/encoding_test.go +++ /dev/null @@ -1,9 +0,0 @@ -package parser - -import "testing" - -func TestEncodeUint8(t *testing.T) { - buf := make([]byte, 1) - encodeUint8(buf, 123) - t.Log(buf) -} diff --git a/parser/event_header.go b/parser/event_header.go deleted file mode 100644 index 940cde2..0000000 --- a/parser/event_header.go +++ /dev/null @@ -1,92 +0,0 @@ -package parser - -import ( - "errors" - - "github.com/localhots/pretty" -) - -var ( - // ErrInvalidHeader is returned when event header cannot be parsed. - ErrInvalidHeader = errors.New("Header is invalid") -) - -// EventHeader represents binlog event header. -type EventHeader struct { - Timestamp uint32 - Type EventType - ServerID uint32 - EventLen uint32 - NextOffset uint32 - Flags uint16 - ExtraHeaders []byte - - eventBody []byte -} - -// Spec: https://dev.mysql.com/doc/internals/en/event-header-fields.html -func (r *Reader) parseHeader(data []byte) (*EventHeader, error) { - headerLen := r.headerLen() - if len(data) < headerLen { - return nil, ErrInvalidHeader - } - // pretty.Println(headerLen, data) - - buf := newReadBuffer(data) - h := &EventHeader{ - Timestamp: buf.readUint32(), - Type: EventType(buf.readUint8()), - ServerID: buf.readUint32(), - EventLen: buf.readUint32(), - } - if r.format.Version == 0 || r.format.Version >= 3 { - h.NextOffset = buf.readUint32() - h.Flags = buf.readUint16() - } - if r.format.Version >= 4 { - h.ExtraHeaders = buf.readStringVarLen(headerLen - 19) - } - h.eventBody = buf.cur() - - if h.NextOffset > 0 { - r.state.Offset = uint64(h.NextOffset) - } - - csa := r.format.ServerDetails.ChecksumAlgorithm - if h.Type != FormatDescriptionEvent && csa == ChecksumAlgorithmCRC32 { - h.eventBody = h.eventBody[:len(h.eventBody)-4] - } - - // pretty.Println(h) - - switch h.Type { - case FormatDescriptionEvent: - r.format = decodeFormatDescription(h.eventBody) - pretty.Println(h.Type.String(), r.format) - case RotateEvent: - r.state = r.decodeRotateEvent(h.eventBody) - pretty.Println(h.Type.String(), r.state) - case TableMapEvent: - tm := r.decodeTableMap(h.eventBody) - r.tableMap[tm.TableID] = tm - // pretty.Println(h.Type.String(), tm) - case WriteRowsEventV0, WriteRowsEventV1, WriteRowsEventV2, - UpdateRowsEventV0, UpdateRowsEventV1, UpdateRowsEventV2, - DeleteRowsEventV0, DeleteRowsEventV1, DeleteRowsEventV2: - r.decodeRowsEvent(h.eventBody, h.Type) - case XIDEvent, GTIDEvent: - // TODO: Add support for these too - case QueryEvent: - // TODO: Handle schema changes - } - - return h, nil -} - -func (r *Reader) headerLen() int { - const defaultHeaderLength = 19 - if r.format.EventHeaderLength > 0 { - return int(r.format.EventHeaderLength) - } - return defaultHeaderLength -} diff --git a/parser/event_rotate.go b/parser/event_rotate.go deleted file mode 100644 index b728851..0000000 --- a/parser/event_rotate.go +++ /dev/null @@ -1,13 +0,0 @@ -package parser - -func (r *Reader) decodeRotateEvent(data []byte) Position { - buf := newReadBuffer(data) - var p Position - if r.format.Version > 1 { - p.Offset = buf.readUint64() - } else { - p.Offset = 4 - } - p.File = string(buf.readStringEOF()) - return p -} diff --git a/parser/event_rows.go b/parser/event_rows.go deleted file mode 100644 index a689cb7..0000000 --- a/parser/event_rows.go +++ /dev/null @@ -1,169 +0,0 @@ -package parser - -import ( - "fmt" - - "github.com/localhots/pretty" -) - -// Rows contains a Rows Event. -type Rows struct { - EventType EventType - TableID uint64 - Flags uint16 - ExtraData []byte - ColumnCount uint64 - ColumnBitmap1 []byte - ColumnBitmap2 []byte - Rows [][]interface{} - TableMap *TableMap -} - -type rowsFlag uint16 - -const ( - rowsFlagEndOfStatement rowsFlag = 0x0001 - rowsFlagNoForeignKeyChecks rowsFlag = 0x0002 - rowsFlagNoUniqueKeyChecks rowsFlag = 0x0004 - rowsFlagRowHasColumns rowsFlag = 0x0008 - - freeTableMapID = 0x00FFFFFF -) - -func (r *Reader) decodeRowsEvent(data []byte, typ EventType) { - // pretty.Println(data) - buf := newReadBuffer(data) - rows := Rows{EventType: typ} - idSize := r.format.tableIDSize(typ) - if idSize == 6 { - rows.TableID = buf.readUint48() - } else { - rows.TableID = uint64(buf.readUint32()) - } - - rows.Flags = buf.readUint16() - - if typ.isEither(WriteRowsEventV2, UpdateRowsEventV2, DeleteRowsEventV2) { - // Extra data length is part of extra data, deduct 2 bytes as they - // already store its length - extraLen := buf.readUint16() - 2 - rows.ExtraData = buf.readStringVarLen(int(extraLen)) - } - - rows.ColumnCount, _ = buf.readUintLenEnc() - rows.ColumnBitmap1 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8) - if typ.isEither(UpdateRowsEventV2, UpdateRowsEventV1) { - rows.ColumnBitmap2 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8) - } - - tm, ok := r.tableMap[rows.TableID] - if !ok { - panic(fmt.Errorf("Out of sync: no table map definition for ID=%d", rows.TableID)) - } - rows.TableMap = &tm - - pretty.Println(typ.String(), rows, tm, buf.cur()) - - rows.decodeRows(buf, rows.ColumnBitmap1) -} - -func (r *Rows) decodeRows(buf *buffer, bm []byte) { - count := 0 - for i := 0; i < int(r.ColumnCount); i++ { - if isBitSet(bm, i) { - count++ - } - } - count = (count + 7) / 8 - - nullBM := buf.readStringVarLen(count) - nullCnt := 0 - row := make([]interface{}, r.ColumnCount) - - pretty.Println(count, nullBM) - - var err error - for i := 0; i < int(r.ColumnCount); i++ { - if !isBitSet(bm, i) { - continue - } - - isNull := (uint32(nullBM[nullCnt/8]) >> uint32(nullCnt%8)) & 0x01 - nullCnt++ - if isNull > 0 { - row[i] = nil - continue - } - - row[i], err = r.decodeValue(buf, columnType(r.TableMap.ColumnTypes[i]), r.TableMap.ColumnMeta[i]) - - if err != nil { - panic(err) - } - } -} - -func (r *Rows) decodeValue(buf *buffer, ct columnType, meta uint16) (interface{}, error) { - switch ct { - case colTypeDecimal: - pretty.Println("Type", ct.String()) - case colTypeTiny: - pretty.Println("Type", ct.String()) - case colTypeShort: - pretty.Println("Type", ct.String()) - case colTypeLong: - pretty.Println("Type", ct.String()) - case colTypeFloat: - pretty.Println("Type", ct.String()) - case colTypeDouble: - pretty.Println("Type", ct.String()) - case colTypeNull: - pretty.Println("Type", ct.String()) - case colTypeTimestamp: - pretty.Println("Type", ct.String()) - case colTypeLonglong: - pretty.Println("Type", ct.String()) - case colTypeInt24: - pretty.Println("Type", ct.String()) - case colTypeDate: - pretty.Println("Type", ct.String()) - case colTypeTime: - pretty.Println("Type", ct.String()) - case colTypeDatetime: - pretty.Println("Type", ct.String()) - case colTypeYear: - pretty.Println("Type", ct.String()) - case colTypeVarchar: - pretty.Println("Type", ct.String()) - case colTypeBit: - pretty.Println("Type", ct.String()) - - case colTypeJSON: - pretty.Println("Type", ct.String()) - case colTypeNewDecimal: - pretty.Println("Type", ct.String()) - case colTypeEnum: - pretty.Println("Type", ct.String()) - case colTypeSet: - pretty.Println("Type", ct.String()) - case colTypeTinyblob: - pretty.Println("Type", ct.String()) - case colTypeMediumblob: - pretty.Println("Type", ct.String()) - case colTypeLongblob: - pretty.Println("Type", ct.String()) - case colTypeBlob: - pretty.Println("Type", ct.String()) - case colTypeVarstring: - pretty.Println("Type", ct.String()) - case colTypeString: - pretty.Println("Type", ct.String()) - case colTypeGeometry: - pretty.Println("Type", ct.String()) - } - return nil, nil -} - -func isBitSet(bm []byte, i int) bool { - return bm[i>>3]&(1<<(uint(i)&7)) > 0 -} diff --git a/parser/event_table_map.go b/parser/event_table_map.go deleted file mode 100644 index 2367f39..0000000 --- a/parser/event_table_map.go +++ /dev/null @@ -1,59 +0,0 @@ -package parser - -// TableMap ... -type TableMap struct { - TableID uint64 - Flags uint16 - SchemaName string - TableName string - ColumnCount uint64 - ColumnTypes []byte - ColumnMeta []uint16 - NullBitmask []byte -} - -// Spec: https://dev.mysql.com/doc/internals/en/table-map-event.html -func (r *Reader) decodeTableMap(data []byte) TableMap { - buf := newReadBuffer(data) - var tm TableMap - idSize := r.format.tableIDSize(TableMapEvent) - if idSize == 6 { - tm.TableID = buf.readUint48() - } else { - tm.TableID = uint64(buf.readUint32()) - } - - tm.Flags = buf.readUint16() - tm.SchemaName = string(buf.readStringLenEnc()) - buf.skip(1) // Always 0x00 - tm.TableName = string(buf.readStringLenEnc()) - buf.skip(1) // Always 0x00 - tm.ColumnCount, _ = buf.readUintLenEnc() - tm.ColumnTypes = buf.readStringVarLen(int(tm.ColumnCount)) - tm.ColumnMeta = decodeColumnMeta(buf.readStringLenEnc(), tm.ColumnTypes) - tm.NullBitmask = buf.readStringVarLen(int(tm.ColumnCount+8) / 7) - - return tm -} - -func decodeColumnMeta(data []byte, cols []byte) []uint16 { - pos := 0 - meta := make([]uint16, len(cols)) - for i, typ := range cols { - switch columnType(typ) { - case colTypeString, colTypeNewDecimal: - // TODO: Is that correct? - meta[i] = uint16(data[pos])<<8 | uint16(data[pos+1]) - pos += 2 - case colTypeVarchar, colTypeVarstring, colTypeBit: - // TODO: Is that correct? - meta[i] = decodeUint16(data[pos:]) - pos += 2 - case colTypeFloat, colTypeDouble, colTypeBlob, colTypeGeometry, colTypeJSON, - colTypeTime2, colTypeDatetime2, colTypeTimestamp2: - meta[i] = uint16(data[pos]) - pos++ - } - } - return meta -} diff --git a/parser/event_types.go b/parser/event_types.go deleted file mode 100644 index 92596d7..0000000 --- a/parser/event_types.go +++ /dev/null @@ -1,212 +0,0 @@ -package parser - -import ( - "fmt" -) - -// EventType defines a binary log event type. -type EventType byte - -// Spec: https://dev.mysql.com/doc/internals/en/event-classes-and-types.html -const ( - // UnknownEvent is an event that should never occur. - UnknownEvent EventType = 0 - // StartEventV3 is the Start_event of binlog format 3. - StartEventV3 EventType = 1 - // QueryEvent is created for each query that modifies the database, unless - // the query is logged row-based. - QueryEvent EventType = 2 - // StopEvent is written to the log files under these circumstances: - // A master writes the event to the binary log when it shuts down. - // A slave writes the event to the relay log when it shuts down or when a - // RESET SLAVE statement is executed. - StopEvent EventType = 3 - // RotateEvent is written at the end of the file that points to the next - // file in the squence. It is written when a binary log file exceeds a size - // limit. - RotateEvent EventType = 4 - // IntvarEvent will be created just before a Query_event, if the query uses - // one of the variables LAST_INSERT_ID or INSERT_ID. - IntvarEvent EventType = 5 - // LoadEvent ... - LoadEvent EventType = 6 - // SlaveEvent ... - SlaveEvent EventType = 7 - // CreateFileEvent ... - CreateFileEvent EventType = 8 - // AppendBlockEvent is created to contain the file data. - AppendBlockEvent EventType = 9 - // ExecLoadEvent ... - ExecLoadEvent EventType = 10 - // DeleteFileEvent occurs when the LOAD DATA failed on the master. - // This event notifies the slave not to do the load and to delete the - // temporary file. - DeleteFileEvent EventType = 11 - // NewLoadEvent ... - NewLoadEvent EventType = 12 - // RandEvent logs random seed used by the next RAND(), and by PASSWORD() - // in 4.1.0. - RandEvent EventType = 13 - // UserVarEvent is written every time a statement uses a user variable; - // precedes other events for the statement. Indicates the value to use for - // the user variable in the next statement. This is written only before a - // QUERY_EVENT and is not used with row-based logging. - UserVarEvent EventType = 14 - // FormatDescriptionEvent is saved by threads which read it, as they need it - // for future use (to decode the ordinary events). - FormatDescriptionEvent EventType = 15 - // XIDEvent is generated for a commit of a transaction that modifies one or - // more tables of an XA-capable storage engine. - XIDEvent EventType = 16 - // BeginLoadQueryEvent is for the first block of file to be loaded, its only - // difference from Append_block event is that this event creates or - // truncates existing file before writing data. - BeginLoadQueryEvent EventType = 17 - // ExecuteLoadQueryEvent is responsible for LOAD DATA execution, it similar - // to Query_event but before executing the query it substitutes original - // filename in LOAD DATA query with name of temporary file. - ExecuteLoadQueryEvent EventType = 18 - // TableMapEvent is used in row-based mode where it preceeds every row - // operation event and maps a table definition to a number. The table - // definition consists of database name, table name, and column definitions. - TableMapEvent EventType = 19 - // WriteRowsEventV0 represents inserted rows. Used in MySQL 5.1.0 to 5.1.15. - WriteRowsEventV0 EventType = 20 - // UpdateRowsEventV0 represents updated rows. It contains both old and new - // versions. Used in MySQL 5.1.0 to 5.1.15. - UpdateRowsEventV0 EventType = 21 - // DeleteRowsEventV0 represents deleted rows. Used in MySQL 5.1.0 to 5.1.15. - DeleteRowsEventV0 EventType = 22 - // WriteRowsEventV1 represents inserted rows. Used in MySQL 5.1.15 to 5.6. - WriteRowsEventV1 EventType = 23 - // UpdateRowsEventV1 represents updated rows. It contains both old and new - // versions. Used in MySQL 5.1.15 to 5.6. - UpdateRowsEventV1 EventType = 24 - // DeleteRowsEventV1 represents deleted rows. Used in MySQL 5.1.15 to 5.6. - DeleteRowsEventV1 EventType = 25 - // IncidentEvent represents an incident, an occurance out of the ordinary, - // that happened on the master. The event is used to inform the slave that - // something out of the ordinary happened on the master that might cause the - // database to be in an inconsistent state. - IncidentEvent EventType = 26 - // HeartbeetEvent is a replication event used to ensure to slave that master - // is alive. The event is originated by master's dump thread and sent - // straight to slave without being logged. Slave itself does not store it in - // relay log but rather uses a data for immediate checks and throws away the - // event. - HeartbeetEvent EventType = 27 - // IgnorableEvent is a kind of event that could be ignored. - IgnorableEvent EventType = 28 - // RowsQueryEvent is a subclass of the IgnorableEvent, to record the - // original query for the rows events in RBR. - RowsQueryEvent EventType = 29 - // WriteRowsEventV2 represents inserted rows. Used starting from MySQL 5.6. - WriteRowsEventV2 EventType = 30 - // UpdateRowsEventV2 represents updated rows. It contains both old and new - // versions. Used starting from MySQL 5.6. - UpdateRowsEventV2 EventType = 31 - // DeleteRowsEventV2 represents deleted rows. Used starting from MySQL 5.6. - DeleteRowsEventV2 EventType = 32 - // GTIDEvent is an event that contains latest GTID. - // GTID stands for Global Transaction IDentifier It is composed of two - // parts: - // * SID for Source Identifier, and - // * GNO for Group Number. The basic idea is to associate an identifier, the - // Global Transaction IDentifier or GTID, to every transaction. When a - // transaction is copied to a slave, re-executed on the slave, and written - // to the slave's binary log, the GTID is preserved. When a slave connects - // to a master, the slave uses GTIDs instead of (file, offset). - GTIDEvent EventType = 33 - // AnonymousGTIDEvent is a subclass of GTIDEvent. - AnonymousGTIDEvent EventType = 34 - // PreviousGTIDsEvent is a subclass of GTIDEvent. - PreviousGTIDsEvent EventType = 35 -) - -func (et EventType) isEither(types ...EventType) bool { - for _, t := range types { - if et == t { - return true - } - } - return false -} - -func (et EventType) String() string { - switch et { - case UnknownEvent: - return "UnknownEvent" - case StartEventV3: - return "StartEventV3" - case QueryEvent: - return "QueryEvent" - case StopEvent: - return "StopEvent" - case RotateEvent: - return "RotateEvent" - case IntvarEvent: - return "IntvarEvent" - case LoadEvent: - return "LoadEvent" - case SlaveEvent: - return "SlaveEvent" - case CreateFileEvent: - return "CreateFileEvent" - case AppendBlockEvent: - return "AppendBlockEvent" - case ExecLoadEvent: - return "ExecLoadEvent" - case DeleteFileEvent: - return "DeleteFileEvent" - case NewLoadEvent: - return "NewLoadEvent" - case RandEvent: - return "RandEvent" - case UserVarEvent: - return "UserVarEvent" - case FormatDescriptionEvent: - return "FormatDescriptionEvent" - case XIDEvent: - return "XIDEvent" - case BeginLoadQueryEvent: - return "BeginLoadQueryEvent" - case ExecuteLoadQueryEvent: - return "ExecuteLoadQueryEvent" - case TableMapEvent: - return "TableMapEvent" - case WriteRowsEventV0: - return "WriteRowsEventV0" - case UpdateRowsEventV0: - return "UpdateRowsEventV0" - case DeleteRowsEventV0: - return "DeleteRowsEventV0" - case WriteRowsEventV1: - return "WriteRowsEventV1" - case UpdateRowsEventV1: - return "UpdateRowsEventV1" - case DeleteRowsEventV1: - return "DeleteRowsEventV1" - case IncidentEvent: - return "IncidentEvent" - case HeartbeetEvent: - return "HeartbeetEvent" - case IgnorableEvent: - return "IgnorableEvent" - case RowsQueryEvent: - return "RowsQueryEvent" - case WriteRowsEventV2: - return "WriteRowsEventV2" - case UpdateRowsEventV2: - return "UpdateRowsEventV2" - case DeleteRowsEventV2: - return "DeleteRowsEventV2" - case GTIDEvent: - return "GTIDEvent" - case AnonymousGTIDEvent: - return "AnonymousGTIDEvent" - case PreviousGTIDsEvent: - return "PreviousGTIDsEvent" - default: - return fmt.Sprintf("Unknown(%d)", et) - } -} diff --git a/parser/reader.go b/parser/reader.go deleted file mode 100644 index 29aa27c..0000000 --- a/parser/reader.go +++ /dev/null @@ -1,197 +0,0 @@ -package parser - -import ( - "context" - "database/sql/driver" - "fmt" - "io" - "os" - - "github.com/juju/errors" - "github.com/localhots/gobelt/log" - "github.com/localhots/mysql" -) - -// Reader ... -type Reader struct { - conn *mysql.ExtendedConn - conf Config - state Position - format FormatDescription - tableMap map[uint64]TableMap -} - -// Config ... -type Config struct { - ServerID uint32 - File string - Offset uint32 - Hostname string -} - -// Position ... -type Position struct { - File string - Offset uint64 -} - -const ( - // Bytes - resultOK byte = 0x00 - resultEOF byte = 0xFE - resultERR byte = 0xFF -) - -// NewReader ... -func NewReader(conn driver.Conn, conf Config) (*Reader, error) { - if conf.Hostname == "" { - name, err := os.Hostname() - if err != nil { - return nil, err - } - conf.Hostname = name - } - - extconn, err := mysql.ExtendConn(conn) - if err != nil { - return nil, err - } - r := &Reader{ - conn: extconn, - conf: conf, - tableMap: make(map[uint64]TableMap), - } - - if err := r.disableChecksum(); err != nil { - return nil, errors.Annotate(err, "Failed to disable binlog checksum") - } - if err := r.registerSlave(); err != nil { - return nil, errors.Annotate(err, "Failed to register slave server") - } - if err := r.binlogDump(); err != nil { - return nil, errors.Annotate(err, "Failed to start binlog dump") - } - - return r, nil -} - -// Connect ... -func Connect(dsn string, conf Config) (*Reader, error) { - conn, err := (&mysql.MySQLDriver{}).Open(dsn) - if err != nil { - return nil, err - } - return NewReader(conn, conf) -} - -// ReadEventHeader reads next event from the log and decodes its header. Header -// is then used to decode the event. -func (r *Reader) ReadEventHeader(ctx context.Context) (*EventHeader, error) { - data, err := r.conn.ReadPacket() - if err != nil { - return nil, err - } - switch data[0] { - case resultOK: - return r.parseHeader(data[1:]) - case resultERR: - return nil, r.conn.HandleErrorPacket(data) - case resultEOF: - log.Debug(ctx, "EOF received") - return nil, nil - default: - log.Errorf(ctx, "Unexpected header: %x", data[0]) - return nil, nil - } -} - -// Spec: https://dev.mysql.com/doc/internals/en/com-register-slave.html -func (r *Reader) registerSlave() error { - const comRegisterSlave byte = 21 - r.conn.ResetSequence() - - buf := newCommandBuffer(1 + 4 + 1 + len(r.conf.Hostname) + 1 + 1 + 2 + 4 + 4) - buf.writeByte(comRegisterSlave) - buf.writeUint32(r.conf.ServerID) - buf.writeString(r.conf.Hostname) - // The rest of the payload would be zeroes, consider following code for - // reference: - // - // buf.writeString(username) - // buf.writeString(password) - // buf.writeUint16(port) - // buf.writeUint32(replicationRank) - // buf.writeUint32(masterID) - - return r.runCmd(buf) -} - -// Spec: https://dev.mysql.com/doc/internals/en/com-binlog-dump.html -// TODO: https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html -func (r *Reader) binlogDump() error { - const comBinlogDump byte = 18 - r.conn.ResetSequence() - - r.state.File = r.conf.File - r.state.Offset = uint64(r.conf.Offset) - // First event offset is 4 - if r.state.Offset < 4 { - r.state.Offset = 4 - } - - buf := newCommandBuffer(1 + 4 + 2 + 4 + len(r.state.File)) - buf.writeByte(comBinlogDump) - buf.writeUint32(uint32(r.state.Offset)) - buf.skip(2) // Flags - buf.writeUint32(r.conf.ServerID) - buf.writeStringEOF(r.state.File) - - return r.runCmd(buf) -} - -func (r *Reader) runCmd(buf *buffer) error { - err := r.conn.WritePacket(buf.data) - if err != nil { - return err - } - return r.conn.ReadResultOK() -} - -func (r *Reader) disableChecksum() error { - cs, err := r.getVar("BINLOG_CHECKSUM") - if err != nil { - return err - } - - if cs != "NONE" { - return r.setVar("@master_binlog_checksum", "NONE") - } - return nil -} - -func (r *Reader) getVar(name string) (string, error) { - rows, err := r.conn.Query(fmt.Sprintf("SHOW VARIABLES LIKE %q", name), []driver.Value{}) - if err != nil { - return "", notEOF(err) - } - defer rows.Close() - - res := make([]driver.Value, len(rows.Columns())) - err = rows.Next(res) - if err != nil { - return "", notEOF(err) - } - - return string(res[1].([]byte)), nil -} - -func (r *Reader) setVar(name, val string) error { - return r.conn.Exec(fmt.Sprintf("SET %s=%q", name, val)) -} - -func notEOF(err error) error { - if err == io.EOF { - return nil - } - return err -} diff --git a/reader/reader.go b/reader/reader.go new file mode 100644 index 0000000..57f0a0f --- /dev/null +++ b/reader/reader.go @@ -0,0 +1,117 @@ +package reader + +import ( + "github.com/juju/errors" + "github.com/localhots/blt/binlog" + "github.com/localhots/blt/schema" + "github.com/localhots/pretty" +) + +// Reader ... +type Reader struct { + conn *SlaveConn + state binlog.Position + format binlog.FormatDescription + tableMap map[uint64]binlog.TableDescription + schema *schema.Schema +} + +// Event ... +type Event struct { + Header binlog.EventHeader + Body []byte +} + +// NewReader ... +func NewReader(conn *SlaveConn) (*Reader, error) { + r := &Reader{ + conn: conn, + tableMap: make(map[uint64]binlog.TableDescription), + } + + if err := conn.DisableChecksum(); err != nil { + return nil, errors.Annotate(err, "disable binlog checksum") + } + if err := conn.RegisterSlave(); err != nil { + return nil, errors.Annotate(err, "register slave server") + } + if err := conn.StartBinlogDump(); err != nil { + return nil, errors.Annotate(err, "start binlog dump") + } + + return r, nil +} + +// ReadEvent ... +func (r *Reader) ReadEvent() (*Event, error) { + connBuff, err := r.conn.ReadPacket() + if err != nil { + return nil, err + } + + var evt Event + if err := evt.Header.Decode(connBuff, r.format); err != nil { + return nil, errors.Annotate(err, "decode event header") + } + + if evt.Header.NextOffset > 0 { + r.state.Offset = uint64(evt.Header.NextOffset) + } + + evt.Body = connBuff[r.format.HeaderLen():] + + csa := r.format.ServerDetails.ChecksumAlgorithm + if evt.Header.Type != binlog.EventTypeFormatDescription && csa == binlog.ChecksumAlgorithmCRC32 { + evt.Body = evt.Body[:len(evt.Body)-4] + } + + // pretty.Println(h) + + switch evt.Header.Type { + case binlog.EventTypeFormatDescription: + var fde binlog.FormatDescriptionEvent + if err = fde.Decode(evt.Body); err == nil { + r.format = fde.FormatDescription + } + pretty.Println(evt.Header.Type.String(), r.format) + case binlog.EventTypeRotate: + var re binlog.RotateEvent + if err = re.Decode(evt.Body, r.format); err == nil { + r.state = re.NextFile + } + pretty.Println(evt.Header.Type.String(), r.state) + case binlog.EventTypeTableMap: + var tme binlog.TableMapEvent + if err = tme.Decode(evt.Body, r.format); err == nil { + r.tableMap[tme.TableID] = tme.TableDescription + } + // pretty.Println(evt.Header.Type.String(), tm) + case binlog.EventTypeWriteRowsV0, + binlog.EventTypeWriteRowsV1, + binlog.EventTypeWriteRowsV2, + binlog.EventTypeUpdateRowsV0, + binlog.EventTypeUpdateRowsV1, + binlog.EventTypeUpdateRowsV2, + binlog.EventTypeDeleteRowsV0, + binlog.EventTypeDeleteRowsV1, + binlog.EventTypeDeleteRowsV2: + + re := binlog.RowsEvent{Type: evt.Header.Type} + tableID := re.PeekTableID(evt.Body, r.format) + td, ok := r.tableMap[tableID] + if !ok { + return nil, errors.New("Unknown table ID") + } + if err = re.Decode(evt.Body, r.format, td); err == nil { + pretty.Println(re) + } + case binlog.EventTypeXID: + // TODO: Add support + case binlog.EventTypeGTID: + // TODO: Add support + case binlog.EventTypeQuery: + // TODO: Handle schema changes + } + + return &evt, err +} diff --git a/reader/slave_conn.go b/reader/slave_conn.go new file mode 100644 index 0000000..a341781 --- /dev/null +++ b/reader/slave_conn.go @@ -0,0 +1,172 @@ +package reader + +import ( + "database/sql/driver" + "encoding/hex" + "fmt" + "io" + "os" + + "github.com/localhots/blt/tools" + "github.com/localhots/mysql" +) + +// SlaveConn ... +type SlaveConn struct { + conn *mysql.ExtendedConn + conf Config +} + +// Config ... +type Config struct { + ServerID uint32 + File string + Offset uint32 + Hostname string +} + +const ( + // Commands + comRegisterSlave byte = 21 + comBinlogDump byte = 18 + + // Bytes + resultOK byte = 0x00 + resultEOF byte = 0xFE + resultERR byte = 0xFF +) + +// Connect ... +func Connect(dsn string, conf Config) (*SlaveConn, error) { + if conf.Hostname == "" { + name, err := os.Hostname() + if err != nil { + return nil, err + } + conf.Hostname = name + } + conf.Hostname = "localhost" + if conf.Offset == 0 { + conf.Offset = 4 + } + + conn, err := (&mysql.MySQLDriver{}).Open(dsn) + if err != nil { + return nil, err + } + + extconn, err := mysql.ExtendConn(conn) + if err != nil { + return nil, err + } + + return &SlaveConn{conn: extconn, conf: conf}, nil +} + +// ReadPacket reads next packet from the server and processes the first status +// byte. +func (c *SlaveConn) ReadPacket() ([]byte, error) { + data, err := c.conn.ReadPacket() + if err != nil { + return nil, err + } + + switch data[0] { + case resultOK: + return data[1:], nil + case resultERR: + return nil, c.conn.HandleErrorPacket(data) + case resultEOF: + return nil, nil + default: + return nil, fmt.Errorf("unexpected header: %x", data[0]) + } +} + +// RegisterSlave issues a REGISTER_SLAVE command to master. +// Spec: https://dev.mysql.com/doc/internals/en/com-register-slave.html +func (c *SlaveConn) RegisterSlave() error { + c.conn.ResetSequence() + + buf := tools.NewCommandBuffer(1 + 4 + 1 + len(c.conf.Hostname) + 1 + 1 + 2 + 4 + 4) + buf.WriteByte(comRegisterSlave) + buf.WriteUint32(c.conf.ServerID) + buf.WriteStringLenEnc(c.conf.Hostname) + // The rest of the payload would be zeroes, consider following code for + // reference: + // + // buf.WriteStringLenEnc(username) + // buf.WriteStringLenEnc(password) + // buf.WriteUint16(port) + // buf.WriteUint32(replicationRank) + // buf.WriteUint32(masterID) + + fmt.Println(hex.Dump(buf.Bytes())) + return c.runCmd(buf.Bytes()) +} + +// StartBinlogDump issues a BINLOG_DUMP command to master. +// Spec: https://dev.mysql.com/doc/internals/en/com-binlog-dump.html +// TODO: https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html +func (c *SlaveConn) StartBinlogDump() error { + c.conn.ResetSequence() + + buf := tools.NewCommandBuffer(1 + 4 + 2 + 4 + len(c.conf.File)) + buf.WriteByte(comBinlogDump) + buf.WriteUint32(uint32(c.conf.Offset)) + buf.Skip(2) // Flags + buf.WriteUint32(c.conf.ServerID) + buf.WriteStringEOF(c.conf.File) + + return c.runCmd(buf.Bytes()) +} + +// DisableChecksum disables CRC32 checksums for this connection. +func (c *SlaveConn) DisableChecksum() error { + cs, err := c.GetVar("BINLOG_CHECKSUM") + if err != nil { + return err + } + + if cs != "NONE" { + return c.SetVar("@master_binlog_checksum", "NONE") + } + return nil +} + +// GetVar fetches value of the given variable. +func (c *SlaveConn) GetVar(name string) (string, error) { + rows, err := c.conn.Query(fmt.Sprintf("SHOW VARIABLES LIKE %q", name), []driver.Value{}) + if err != nil { + return "", notEOF(err) + } + defer rows.Close() + + res := make([]driver.Value, len(rows.Columns())) + err = rows.Next(res) + if err != nil { + return "", notEOF(err) + } + + return string(res[1].([]byte)), nil +} + +// SetVar assigns a new value to the given variable. +func (c *SlaveConn) SetVar(name, val string) error { + return c.conn.Exec(fmt.Sprintf("SET %s=%q", name, val)) +} + +func (c *SlaveConn) runCmd(data []byte) error { + err := c.conn.WritePacket(data) + if err != nil { + return err + } + return c.conn.ReadResultOK() +} + +func notEOF(err error) error { + if err == io.EOF { + return nil + } + return err +} diff --git a/tools/buffer.go b/tools/buffer.go new file mode 100644 index 0000000..c94d41b --- /dev/null +++ b/tools/buffer.go @@ -0,0 +1,144 @@ +package tools + +import ( + "encoding/binary" + + "github.com/localhots/blt/mysql" +) + +// Buffer is a simple wrapper over a slice of bytes with a cursor. It allows for +// easy command building and results parsing. +type Buffer struct { + data []byte + pos int +} + +// NewBuffer creates a new buffer from a given slice of bytes and sets the +// cursor to the beginning. +func NewBuffer(data []byte) *Buffer { + return &Buffer{data: data} +} + +// NewCommandBuffer pre-allocates a buffer of a given size and reserves 4 bytes +// at the beginning for the driver, these would be used to set command length +// and sequence number. +func NewCommandBuffer(size int) *Buffer { + return &Buffer{data: make([]byte, size+4), pos: 4} +} + +// Skip advances the cursor by N bytes. +func (b *Buffer) Skip(n int) { + b.pos += n +} + +// Read returns next N bytes and advances the cursor. +func (b *Buffer) Read(n int) []byte { + b.pos += n + return b.data[b.pos-n:] +} + +// Cur returns remaining unread buffer. +func (b *Buffer) Cur() []byte { + return b.data[b.pos:] +} + +// Bytes returns entire buffer contents. +func (b *Buffer) Bytes() []byte { + return b.data +} + +// ReadUint8 reads a uint8 and advances cursor by 1 byte. +func (b *Buffer) ReadUint8() uint8 { + return mysql.DecodeUint8(b.Read(1)) +} + +// ReadUint16 reads a uint16 and advances cursor by 2 bytes. +func (b *Buffer) ReadUint16() uint16 { + return mysql.DecodeUint16(b.Read(2)) +} + +// ReadUint24 reads a 3-byte integer as uint32 and advances cursor by 3 bytes. +func (b *Buffer) ReadUint24() uint32 { + return mysql.DecodeUint24(b.Read(3)) +} + +// ReadUint32 reads a uint32 and advances cursor by 4 bytes. +func (b *Buffer) ReadUint32() uint32 { + return mysql.DecodeUint32(b.Read(4)) +} + +// ReadUint48 reads a 6-byte integer as uint64 and advances cursor by 6 bytes. +func (b *Buffer) ReadUint48() uint64 { + return mysql.DecodeUint48(b.Read(6)) +} + +// ReadUint64 reads a uint64 and advances cursor by 8 bytes. +func (b *Buffer) ReadUint64() uint64 { + return mysql.DecodeUint64(b.Read(8)) +} + +// ReadUintLenEnc reads a length-encoded integer and advances cursor accordingly. +func (b *Buffer) ReadUintLenEnc() (val uint64, isNull bool, size int) { + val, isNull, size = mysql.DecodeUintLenEnc(b.Cur()) + b.Skip(size) + return +} + +// ReadStringNullTerm reads a NULL-terminated string and advances cursor by its +// length plus 1 extra byte. +func (b *Buffer) ReadStringNullTerm() []byte { + str := mysql.DecodeStringNullTerm(b.Cur()) + b.Skip(len(str) + 1) + return str +} + +// ReadStringVarLen reads a variable-length string and advances cursor by the +// same number of bytes. +func (b *Buffer) ReadStringVarLen(n int) []byte { + return mysql.DecodeStringVarLen(b.Read(n), n) +} + +// ReadStringLenEnc reads a length-encoded string and advances cursor +// accordingly. +func (b *Buffer) ReadStringLenEnc() (str []byte, size int) { + str, size = mysql.DecodeStringLenEnc(b.Cur()) + b.Skip(size) + return +} + +// ReadStringEOF reads remaining contents of the buffer as a new string. +func (b *Buffer) ReadStringEOF() []byte { + return mysql.DecodeStringEOF(b.Cur()) +} + +// WriteByte writes given byte to the buffer and advances cursor by 1. +func (b *Buffer) WriteByte(v byte) { + b.data[b.pos] = v + b.pos++ +} + +// WriteUint16 writes given uint16 value to the buffer and advances cursor by 2. +func (b *Buffer) WriteUint16(v uint16) { + binary.LittleEndian.PutUint16(b.data[b.pos:], v) + b.pos += 2 +} + +// WriteUint32 writes given uint32 value to the buffer and advances cursor by 4. +func (b *Buffer) WriteUint32(v uint32) { + binary.LittleEndian.PutUint32(b.data[b.pos:], v) + b.pos += 4 +} + +// WriteStringLenEnc writes a length-encoded string to the buffer and advances +// cursor accordingly. +func (b *Buffer) WriteStringLenEnc(s string) { + b.data[b.pos] = byte(len(s)) + b.pos++ + b.pos += copy(b.data[b.pos:], s) +} + +// WriteStringEOF writes given string to the buffer and advances cursor by its +// length. +func (b *Buffer) WriteStringEOF(s string) { + b.pos += copy(b.data[b.pos:], s) +}