Move parser code to it's own package

This commit is contained in:
2018-11-05 18:15:59 +01:00
parent 93f9ee8b52
commit 3a5271bca3
13 changed files with 182 additions and 80 deletions
+115
View File
@@ -0,0 +1,115 @@
package parser
import (
"encoding/binary"
)
// buffer is a simple wrapper over a slice of bytes with a cursor. It allows for
// easy command building and results parsing.
type buffer struct {
data []byte
pos int
}
// skip next n bytes
func (b *buffer) skip(n int) {
b.pos += n
}
// advance skips next N bytes and returns them
func (b *buffer) advance(n int) []byte {
b.skip(n)
return b.data[b.pos-n:]
}
// cur returns remaining unread buffer.
func (b *buffer) cur() []byte {
return b.data[b.pos:]
}
// newReadBuffer creates a buffer with command output.
func newReadBuffer(data []byte) *buffer {
return &buffer{data: data}
}
func (b *buffer) readUint8() uint8 {
return decodeUint8(b.advance(1))
}
func (b *buffer) readUint16() uint16 {
return decodeUint16(b.advance(2))
}
func (b *buffer) readUint24() uint32 {
return decodeUint24(b.advance(3))
}
func (b *buffer) readUint32() uint32 {
return decodeUint32(b.advance(4))
}
func (b *buffer) readUint48() uint64 {
return decodeUint48(b.advance(6))
}
func (b *buffer) readUint64() uint64 {
return decodeUint64(b.advance(8))
}
func (b *buffer) readUintLenEnc() (val uint64, isNull bool) {
var size int
val, isNull, size = decodeUintLenEnc(b.cur())
b.skip(size)
return
}
func (b *buffer) readStringNullTerm() []byte {
str := decodeStringNullTerm(b.cur())
b.skip(len(str) + 1)
return str
}
func (b *buffer) readStringVarLen(n int) []byte {
return decodeStringVarLen(b.advance(n), n)
}
func (b *buffer) readStringLenEnc() []byte {
str, size := decodeStringLenEnc(b.cur())
b.skip(size)
return str
}
func (b *buffer) readStringEOF() []byte {
return decodeStringEOF(b.cur())
}
// Pre-allocate command buffer. First four bytes would be used to set command
// length and sequence number.
func newCommandBuffer(size int) *buffer {
return &buffer{data: make([]byte, size+4), pos: 4}
}
func (b *buffer) writeByte(v byte) {
b.data[b.pos] = v
b.pos++
}
func (b *buffer) writeUint16(v uint16) {
binary.LittleEndian.PutUint16(b.data[b.pos:], v)
b.pos += 2
}
func (b *buffer) writeUint32(v uint32) {
binary.LittleEndian.PutUint32(b.data[b.pos:], v)
b.pos += 4
}
func (b *buffer) writeString(s string) {
b.data[b.pos] = byte(len(s))
b.pos++
b.pos += copy(b.data[b.pos:], s)
}
func (b *buffer) writeStringEOF(s string) {
b.pos += copy(b.data[b.pos:], s)
}
+112
View File
@@ -0,0 +1,112 @@
package parser
import (
"fmt"
)
type columnType byte
// Spec: https://dev.mysql.com/doc/internals/en/com-query-response.html#column-type
const (
colTypeDecimal columnType = 0x00
colTypeTiny columnType = 0x01
colTypeShort columnType = 0x02
colTypeLong columnType = 0x03
colTypeFloat columnType = 0x04
colTypeDouble columnType = 0x05
colTypeNull columnType = 0x06
colTypeTimestamp columnType = 0x07
colTypeLonglong columnType = 0x08
colTypeInt24 columnType = 0x09
colTypeDate columnType = 0x0a
colTypeTime columnType = 0x0b
colTypeDatetime columnType = 0x0c
colTypeYear columnType = 0x0d
colTypeNewDate columnType = 0x0e // Internal
colTypeVarchar columnType = 0x0f
colTypeBit columnType = 0x10
colTypeTimestamp2 columnType = 0x11 // Internal
colTypeDatetime2 columnType = 0x12 // Internal
colTypeTime2 columnType = 0x13 // Internal
colTypeJSON columnType = 0xF5
colTypeNewDecimal columnType = 0xF6
colTypeEnum columnType = 0xF7
colTypeSet columnType = 0xF8
colTypeTinyblob columnType = 0xF9
colTypeMediumblob columnType = 0xFA
colTypeLongblob columnType = 0xFB
colTypeBlob columnType = 0xFC
colTypeVarstring columnType = 0xFD
colTypeString columnType = 0xFE
colTypeGeometry columnType = 0xFF
)
func (ct columnType) String() string {
switch ct {
case colTypeDecimal:
return "Decimal"
case colTypeTiny:
return "Tiny"
case colTypeShort:
return "Short"
case colTypeLong:
return "Long"
case colTypeFloat:
return "Float"
case colTypeDouble:
return "Double"
case colTypeNull:
return "Null"
case colTypeTimestamp:
return "Timestamp"
case colTypeLonglong:
return "Longlong"
case colTypeInt24:
return "Int24"
case colTypeDate:
return "Date"
case colTypeTime:
return "Time"
case colTypeDatetime:
return "Datetime"
case colTypeYear:
return "Year"
case colTypeNewDate:
return "NewDate"
case colTypeVarchar:
return "Varchar"
case colTypeBit:
return "Bit"
case colTypeTimestamp2:
return "Timestamp2"
case colTypeDatetime2:
return "Datetime2"
case colTypeTime2:
return "Time2"
case colTypeJSON:
return "JSON"
case colTypeNewDecimal:
return "NewDecimal"
case colTypeEnum:
return "Enum"
case colTypeSet:
return "Set"
case colTypeTinyblob:
return "Tinyblob"
case colTypeMediumblob:
return "Mediumblob"
case colTypeLongblob:
return "Longblob"
case colTypeBlob:
return "Blob"
case colTypeVarstring:
return "Varstring"
case colTypeString:
return "String"
case colTypeGeometry:
return "Geometry"
default:
return fmt.Sprintf("Unknown(%d)", ct)
}
}
+211
View File
@@ -0,0 +1,211 @@
package parser
import (
"encoding/binary"
"fmt"
)
// Protocol::FixedLengthInteger
// A fixed-length integer stores its value in a series of bytes with the least
// significant byte first (little endian).
// Spec: https://dev.mysql.com/doc/internals/en/integer.html#fixed-length-integer
// int<1>
func encodeUint8(data []byte, v uint8) {
data[0] = v
}
func decodeUint8(data []byte) uint8 {
return uint8(data[0])
}
// int<2>
func encodeUint16(data []byte, v uint16) {
binary.LittleEndian.PutUint16(data, v)
}
func decodeUint16(data []byte) uint16 {
return binary.LittleEndian.Uint16(data)
}
// int<3>
func encodeUint24(data []byte, v uint32) {
encodeVarLen64(data, uint64(v), 3)
}
func decodeUint24(data []byte) uint32 {
return uint32(decodeVarLen64(data, 3))
}
// int<4>
func encodeUint32(data []byte, v uint32) {
binary.LittleEndian.PutUint32(data, v)
}
func decodeUint32(data []byte) uint32 {
return binary.LittleEndian.Uint32(data)
}
// int<6>
func encodeUint48(data []byte, v uint64) {
encodeVarLen64(data, v, 6)
}
func decodeUint48(data []byte) uint64 {
return decodeVarLen64(data, 6)
}
// int<8>
func encodeUint64(data []byte, v uint64) {
binary.LittleEndian.PutUint64(data, v)
}
func decodeUint64(data []byte) uint64 {
return binary.LittleEndian.Uint64(data)
}
// Protocol::LengthEncodedInteger
// An integer that consumes 1, 3, 4, or 9 bytes, depending on its numeric value.
// Spec: https://dev.mysql.com/doc/internals/en/integer.html#length-encoded-integer
// To convert a number value into a length-encoded integer:
// If the value is < 251, it is stored as a 1-byte integer.
// If the value is ≥ 251 and < (2^16), it is stored as 0xFC + 2-byte integer.
// If the value is ≥ (2^16) and < (2^24), it is stored as 0xFD + 3-byte integer.
// If the value is ≥ (2^24) and < (2^64) it is stored as 0xFE + 8-byte integer.
// Note: up to MySQL 3.22, 0xFE was followed by a 4-byte integer.
func encodeUintLenEnc(data []byte, v uint64, isNull bool) (size int) {
switch {
case isNull:
data[0] = 0xFB
return 1
case v <= 0xFB:
data[0] = byte(v)
return 1
case v <= 2<<15:
data[0] = 0xFC
encodeVarLen64(data[1:], v, 2)
return 3
case v <= 2<<23:
data[0] = 0xFD
encodeVarLen64(data[1:], v, 3)
return 4
default:
data[0] = 0xFE
encodeVarLen64(data[1:], v, 8)
return 9
}
}
// To convert a length-encoded integer into its numeric value, check the first
// byte:
// If it is < 0xFB, treat it as a 1-byte integer.
// If it is 0xFC, it is followed by a 2-byte integer.
// If it is 0xFD, it is followed by a 3-byte integer.
// If it is 0xFE, it is followed by a 8-byte integer.
// Depending on the context, the first byte may also have other meanings:
// If it is 0xFB, it is represents a NULL in a ProtocolText::ResultsetRow.
// If it is 0xFF and is the first byte of an ERR_Packet
// Caution:
// If the first byte of a packet is a length-encoded integer and its byte value
// is 0xFE, you must check the length of the packet to verify that it has enough
// space for a 8-byte integer.
// If not, it may be an EOF_Packet instead.
func decodeUintLenEnc(data []byte) (v uint64, isNull bool, size int) {
switch data[0] {
case 0xFB:
return 0xFB, true, 1
case 0xFC:
return decodeVarLen64(data[1:], 2), false, 3
case 0xFD:
return decodeVarLen64(data[1:], 3), false, 4
case 0xFE:
return decodeVarLen64(data[1:], 8), false, 9
default:
return uint64(data[0]), false, 1
}
}
//
// Variable length encoding helpers
//
func encodeVarLen64(data []byte, v uint64, s int) {
for i := 0; i < s; i++ {
data[i] = byte(v >> uint(i*8))
}
}
func decodeVarLen64(data []byte, s int) uint64 {
v := uint64(data[0])
for i := 1; i < s; i++ {
v |= uint64(data[i]) << uint(i*8)
}
return v
}
// Protocol::NulTerminatedString
// Strings that are terminated by a 0x00 byte.
func decodeStringNullTerm(data []byte) []byte {
for i, c := range data {
if c == 0x00 {
s := make([]byte, i+1)
copy(s, data[:i])
return s
}
}
s := make([]byte, len(data))
copy(s, data)
return s
}
// Protocol::VariableLengthString
// The length of the string is determined by another field or is calculated at
// runtime.
// Protocol::FixedLengthString
// Fixed-length strings have a known, hardcoded length.
func encodeStringVarLen(data, str []byte) {
copy(data, str)
}
func decodeStringVarLen(data []byte, n int) []byte {
return decodeStringEOF(data[:n])
}
// Protocol::LengthEncodedString
// A length encoded string is a string that is prefixed with length encoded
// integer describing the length of the string.
// It is a special case of Protocol::VariableLengthString
func decodeStringLenEnc(data []byte) (str []byte, size int) {
strlen, _, size := decodeUintLenEnc(data)
strleni := int(strlen)
s := make([]byte, strleni)
copy(s, data[size:size+strleni])
return s, size + strleni
}
// Protocol::RestOfPacketString
// If a string is the last component of a packet, its length can be calculated
// from the overall packet length minus the current position.
func decodeStringEOF(data []byte) []byte {
s := make([]byte, len(data))
copy(s, data)
return s
}
func trimString(str []byte) string {
fmt.Println(str, string(str))
for i, c := range str {
if c == 0x00 {
return string(str[:i])
}
}
return string(str)
}
+9
View File
@@ -0,0 +1,9 @@
package parser
import "testing"
func TestEncodeUint8(t *testing.T) {
buf := make([]byte, 1)
encodeUint8(buf, 123)
t.Log(buf)
}
+112
View File
@@ -0,0 +1,112 @@
package parser
import (
"fmt"
"strconv"
"strings"
)
// FormatDescription is a description of binary log format.
type FormatDescription struct {
Version uint16
ServerVersion string
CreateTimestamp uint32
EventHeaderLength uint8
EventTypeHeaderLengths []uint8
ServerDetails ServerDetails
}
// ServerDetails contains server feature details.
type ServerDetails struct {
Flavor Flavor
Version int
ChecksumAlgorithm ChecksumAlgorithm
}
// Flavor defines the specific kind of MySQL-like database.
type Flavor string
// ChecksumAlgorithm is a checksum algorithm is the one used by the server.
type ChecksumAlgorithm byte
const (
// FlavorMySQL is the MySQL db flavor.
FlavorMySQL = "MySQL"
)
const (
// ChecksumAlgorithmNone means no checksum appened.
ChecksumAlgorithmNone ChecksumAlgorithm = 0x00
// ChecksumAlgorithmCRC32 used to append a 4 byte checksum at the end.
ChecksumAlgorithmCRC32 ChecksumAlgorithm = 0x01
// ChecksumAlgorithmUndefined is used when checksum algorithm is not known.
ChecksumAlgorithmUndefined ChecksumAlgorithm = 0xFF
)
// Spec: https://dev.mysql.com/doc/internals/en/format-description-event.html
func decodeFormatDescription(data []byte) FormatDescription {
buf := newReadBuffer(data)
fd := FormatDescription{
Version: buf.readUint16(),
ServerVersion: string(trimString(buf.readStringVarLen(50))),
CreateTimestamp: buf.readUint32(),
EventHeaderLength: buf.readUint8(),
EventTypeHeaderLengths: buf.readStringEOF(),
}
fd.ServerDetails = ServerDetails{
Flavor: FlavorMySQL,
Version: parseVersionNumber(fd.ServerVersion),
ChecksumAlgorithm: ChecksumAlgorithmUndefined,
}
if fd.ServerDetails.Version > 50601 {
// Last 5 bytes are:
// [1] Checksum algorithm
// [4] Checksum
fd.ServerDetails.ChecksumAlgorithm = ChecksumAlgorithm(data[len(data)-5])
fd.EventTypeHeaderLengths = fd.EventTypeHeaderLengths[:len(fd.EventTypeHeaderLengths)-5]
}
return fd
}
func (fd FormatDescription) tableIDSize(et EventType) int {
if fd.headerLen(et) == 6 {
return 4
}
return 6
}
func (fd FormatDescription) headerLen(et EventType) int {
return int(fd.EventTypeHeaderLengths[et-1])
}
func (ca ChecksumAlgorithm) String() string {
switch ca {
case ChecksumAlgorithmNone:
return "None"
case ChecksumAlgorithmCRC32:
return "CRC32"
case ChecksumAlgorithmUndefined:
return "Undefined"
default:
return fmt.Sprintf("Unknown(%d)", ca)
}
}
// parseVersionNumber turns string version into a number just like the library
// mysql_get_server_version function does.
// Example: 5.7.19-log gets represented as 50719
// Spec: https://dev.mysql.com/doc/refman/8.0/en/mysql-get-server-version.html
func parseVersionNumber(v string) int {
tokens := strings.Split(v, ".")
major, _ := strconv.Atoi(tokens[0])
minor, _ := strconv.Atoi(tokens[1])
var patch int
for i, c := range tokens[2] {
if c < '0' || c > '9' {
patch, _ = strconv.Atoi(tokens[2][:i])
break
}
}
return major*10000 + minor*100 + patch
}
+92
View File
@@ -0,0 +1,92 @@
package parser
import (
"errors"
"github.com/localhots/pretty"
)
var (
// ErrInvalidHeader is returned when event header cannot be parsed.
ErrInvalidHeader = errors.New("Header is invalid")
)
// EventHeader represents binlog event header.
type EventHeader struct {
Timestamp uint32
Type EventType
ServerID uint32
EventLen uint32
NextOffset uint32
Flags uint16
ExtraHeaders []byte
eventBody []byte
}
// Spec: https://dev.mysql.com/doc/internals/en/event-header-fields.html
func (r *Reader) parseHeader(data []byte) (*EventHeader, error) {
headerLen := r.headerLen()
if len(data) < headerLen {
return nil, ErrInvalidHeader
}
// pretty.Println(headerLen, data)
buf := newReadBuffer(data)
h := &EventHeader{
Timestamp: buf.readUint32(),
Type: EventType(buf.readUint8()),
ServerID: buf.readUint32(),
EventLen: buf.readUint32(),
}
if r.format.Version == 0 || r.format.Version >= 3 {
h.NextOffset = buf.readUint32()
h.Flags = buf.readUint16()
}
if r.format.Version >= 4 {
h.ExtraHeaders = buf.readStringVarLen(headerLen - 19)
}
h.eventBody = buf.cur()
if h.NextOffset > 0 {
r.state.Offset = uint64(h.NextOffset)
}
csa := r.format.ServerDetails.ChecksumAlgorithm
if h.Type != FormatDescriptionEvent && csa == ChecksumAlgorithmCRC32 {
h.eventBody = h.eventBody[:len(h.eventBody)-4]
}
// pretty.Println(h)
switch h.Type {
case FormatDescriptionEvent:
r.format = decodeFormatDescription(h.eventBody)
pretty.Println(h.Type.String(), r.format)
case RotateEvent:
r.state = r.decodeRotateEvent(h.eventBody)
pretty.Println(h.Type.String(), r.state)
case TableMapEvent:
tm := r.decodeTableMap(h.eventBody)
r.tableMap[tm.TableID] = tm
// pretty.Println(h.Type.String(), tm)
case WriteRowsEventV0, WriteRowsEventV1, WriteRowsEventV2,
UpdateRowsEventV0, UpdateRowsEventV1, UpdateRowsEventV2,
DeleteRowsEventV0, DeleteRowsEventV1, DeleteRowsEventV2:
r.decodeRowsEvent(h.eventBody, h.Type)
case XIDEvent, GTIDEvent:
// TODO: Add support for these too
case QueryEvent:
// TODO: Handle schema changes
}
return h, nil
}
func (r *Reader) headerLen() int {
const defaultHeaderLength = 19
if r.format.EventHeaderLength > 0 {
return int(r.format.EventHeaderLength)
}
return defaultHeaderLength
}
+13
View File
@@ -0,0 +1,13 @@
package parser
func (r *Reader) decodeRotateEvent(data []byte) Position {
buf := newReadBuffer(data)
var p Position
if r.format.Version > 1 {
p.Offset = buf.readUint64()
} else {
p.Offset = 4
}
p.File = string(buf.readStringEOF())
return p
}
+169
View File
@@ -0,0 +1,169 @@
package parser
import (
"fmt"
"github.com/localhots/pretty"
)
// Rows contains a Rows Event.
type Rows struct {
EventType EventType
TableID uint64
Flags uint16
ExtraData []byte
ColumnCount uint64
ColumnBitmap1 []byte
ColumnBitmap2 []byte
Rows [][]interface{}
TableMap *TableMap
}
type rowsFlag uint16
const (
rowsFlagEndOfStatement rowsFlag = 0x0001
rowsFlagNoForeignKeyChecks rowsFlag = 0x0002
rowsFlagNoUniqueKeyChecks rowsFlag = 0x0004
rowsFlagRowHasColumns rowsFlag = 0x0008
freeTableMapID = 0x00FFFFFF
)
func (r *Reader) decodeRowsEvent(data []byte, typ EventType) {
// pretty.Println(data)
buf := newReadBuffer(data)
rows := Rows{EventType: typ}
idSize := r.format.tableIDSize(typ)
if idSize == 6 {
rows.TableID = buf.readUint48()
} else {
rows.TableID = uint64(buf.readUint32())
}
rows.Flags = buf.readUint16()
if typ.isEither(WriteRowsEventV2, UpdateRowsEventV2, DeleteRowsEventV2) {
// Extra data length is part of extra data, deduct 2 bytes as they
// already store its length
extraLen := buf.readUint16() - 2
rows.ExtraData = buf.readStringVarLen(int(extraLen))
}
rows.ColumnCount, _ = buf.readUintLenEnc()
rows.ColumnBitmap1 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8)
if typ.isEither(UpdateRowsEventV2, UpdateRowsEventV1) {
rows.ColumnBitmap2 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8)
}
tm, ok := r.tableMap[rows.TableID]
if !ok {
panic(fmt.Errorf("Out of sync: no table map definition for ID=%d", rows.TableID))
}
rows.TableMap = &tm
pretty.Println(typ.String(), rows, tm, buf.cur())
rows.decodeRows(buf, rows.ColumnBitmap1)
}
func (r *Rows) decodeRows(buf *buffer, bm []byte) {
count := 0
for i := 0; i < int(r.ColumnCount); i++ {
if isBitSet(bm, i) {
count++
}
}
count = (count + 7) / 8
nullBM := buf.readStringVarLen(count)
nullCnt := 0
row := make([]interface{}, r.ColumnCount)
pretty.Println(count, nullBM)
var err error
for i := 0; i < int(r.ColumnCount); i++ {
if !isBitSet(bm, i) {
continue
}
isNull := (uint32(nullBM[nullCnt/8]) >> uint32(nullCnt%8)) & 0x01
nullCnt++
if isNull > 0 {
row[i] = nil
continue
}
row[i], err = r.decodeValue(buf, columnType(r.TableMap.ColumnTypes[i]), r.TableMap.ColumnMeta[i])
if err != nil {
panic(err)
}
}
}
func (r *Rows) decodeValue(buf *buffer, ct columnType, meta uint16) (interface{}, error) {
switch ct {
case colTypeDecimal:
pretty.Println("Type", ct.String())
case colTypeTiny:
pretty.Println("Type", ct.String())
case colTypeShort:
pretty.Println("Type", ct.String())
case colTypeLong:
pretty.Println("Type", ct.String())
case colTypeFloat:
pretty.Println("Type", ct.String())
case colTypeDouble:
pretty.Println("Type", ct.String())
case colTypeNull:
pretty.Println("Type", ct.String())
case colTypeTimestamp:
pretty.Println("Type", ct.String())
case colTypeLonglong:
pretty.Println("Type", ct.String())
case colTypeInt24:
pretty.Println("Type", ct.String())
case colTypeDate:
pretty.Println("Type", ct.String())
case colTypeTime:
pretty.Println("Type", ct.String())
case colTypeDatetime:
pretty.Println("Type", ct.String())
case colTypeYear:
pretty.Println("Type", ct.String())
case colTypeVarchar:
pretty.Println("Type", ct.String())
case colTypeBit:
pretty.Println("Type", ct.String())
case colTypeJSON:
pretty.Println("Type", ct.String())
case colTypeNewDecimal:
pretty.Println("Type", ct.String())
case colTypeEnum:
pretty.Println("Type", ct.String())
case colTypeSet:
pretty.Println("Type", ct.String())
case colTypeTinyblob:
pretty.Println("Type", ct.String())
case colTypeMediumblob:
pretty.Println("Type", ct.String())
case colTypeLongblob:
pretty.Println("Type", ct.String())
case colTypeBlob:
pretty.Println("Type", ct.String())
case colTypeVarstring:
pretty.Println("Type", ct.String())
case colTypeString:
pretty.Println("Type", ct.String())
case colTypeGeometry:
pretty.Println("Type", ct.String())
}
return nil, nil
}
func isBitSet(bm []byte, i int) bool {
return bm[i>>3]&(1<<(uint(i)&7)) > 0
}
+59
View File
@@ -0,0 +1,59 @@
package parser
// TableMap ...
type TableMap struct {
TableID uint64
Flags uint16
SchemaName string
TableName string
ColumnCount uint64
ColumnTypes []byte
ColumnMeta []uint16
NullBitmask []byte
}
// Spec: https://dev.mysql.com/doc/internals/en/table-map-event.html
func (r *Reader) decodeTableMap(data []byte) TableMap {
buf := newReadBuffer(data)
var tm TableMap
idSize := r.format.tableIDSize(TableMapEvent)
if idSize == 6 {
tm.TableID = buf.readUint48()
} else {
tm.TableID = uint64(buf.readUint32())
}
tm.Flags = buf.readUint16()
tm.SchemaName = string(buf.readStringLenEnc())
buf.skip(1) // Always 0x00
tm.TableName = string(buf.readStringLenEnc())
buf.skip(1) // Always 0x00
tm.ColumnCount, _ = buf.readUintLenEnc()
tm.ColumnTypes = buf.readStringVarLen(int(tm.ColumnCount))
tm.ColumnMeta = decodeColumnMeta(buf.readStringLenEnc(), tm.ColumnTypes)
tm.NullBitmask = buf.readStringVarLen(int(tm.ColumnCount+8) / 7)
return tm
}
func decodeColumnMeta(data []byte, cols []byte) []uint16 {
pos := 0
meta := make([]uint16, len(cols))
for i, typ := range cols {
switch columnType(typ) {
case colTypeString, colTypeNewDecimal:
// TODO: Is that correct?
meta[i] = uint16(data[pos])<<8 | uint16(data[pos+1])
pos += 2
case colTypeVarchar, colTypeVarstring, colTypeBit:
// TODO: Is that correct?
meta[i] = decodeUint16(data[pos:])
pos += 2
case colTypeFloat, colTypeDouble, colTypeBlob, colTypeGeometry, colTypeJSON,
colTypeTime2, colTypeDatetime2, colTypeTimestamp2:
meta[i] = uint16(data[pos])
pos++
}
}
return meta
}
+212
View File
@@ -0,0 +1,212 @@
package parser
import (
"fmt"
)
// EventType defines a binary log event type.
type EventType byte
// Spec: https://dev.mysql.com/doc/internals/en/event-classes-and-types.html
const (
// UnknownEvent is an event that should never occur.
UnknownEvent EventType = 0
// StartEventV3 is the Start_event of binlog format 3.
StartEventV3 EventType = 1
// QueryEvent is created for each query that modifies the database, unless
// the query is logged row-based.
QueryEvent EventType = 2
// StopEvent is written to the log files under these circumstances:
// A master writes the event to the binary log when it shuts down.
// A slave writes the event to the relay log when it shuts down or when a
// RESET SLAVE statement is executed.
StopEvent EventType = 3
// RotateEvent is written at the end of the file that points to the next
// file in the squence. It is written when a binary log file exceeds a size
// limit.
RotateEvent EventType = 4
// IntvarEvent will be created just before a Query_event, if the query uses
// one of the variables LAST_INSERT_ID or INSERT_ID.
IntvarEvent EventType = 5
// LoadEvent ...
LoadEvent EventType = 6
// SlaveEvent ...
SlaveEvent EventType = 7
// CreateFileEvent ...
CreateFileEvent EventType = 8
// AppendBlockEvent is created to contain the file data.
AppendBlockEvent EventType = 9
// ExecLoadEvent ...
ExecLoadEvent EventType = 10
// DeleteFileEvent occurs when the LOAD DATA failed on the master.
// This event notifies the slave not to do the load and to delete the
// temporary file.
DeleteFileEvent EventType = 11
// NewLoadEvent ...
NewLoadEvent EventType = 12
// RandEvent logs random seed used by the next RAND(), and by PASSWORD()
// in 4.1.0.
RandEvent EventType = 13
// UserVarEvent is written every time a statement uses a user variable;
// precedes other events for the statement. Indicates the value to use for
// the user variable in the next statement. This is written only before a
// QUERY_EVENT and is not used with row-based logging.
UserVarEvent EventType = 14
// FormatDescriptionEvent is saved by threads which read it, as they need it
// for future use (to decode the ordinary events).
FormatDescriptionEvent EventType = 15
// XIDEvent is generated for a commit of a transaction that modifies one or
// more tables of an XA-capable storage engine.
XIDEvent EventType = 16
// BeginLoadQueryEvent is for the first block of file to be loaded, its only
// difference from Append_block event is that this event creates or
// truncates existing file before writing data.
BeginLoadQueryEvent EventType = 17
// ExecuteLoadQueryEvent is responsible for LOAD DATA execution, it similar
// to Query_event but before executing the query it substitutes original
// filename in LOAD DATA query with name of temporary file.
ExecuteLoadQueryEvent EventType = 18
// TableMapEvent is used in row-based mode where it preceeds every row
// operation event and maps a table definition to a number. The table
// definition consists of database name, table name, and column definitions.
TableMapEvent EventType = 19
// WriteRowsEventV0 represents inserted rows. Used in MySQL 5.1.0 to 5.1.15.
WriteRowsEventV0 EventType = 20
// UpdateRowsEventV0 represents updated rows. It contains both old and new
// versions. Used in MySQL 5.1.0 to 5.1.15.
UpdateRowsEventV0 EventType = 21
// DeleteRowsEventV0 represents deleted rows. Used in MySQL 5.1.0 to 5.1.15.
DeleteRowsEventV0 EventType = 22
// WriteRowsEventV1 represents inserted rows. Used in MySQL 5.1.15 to 5.6.
WriteRowsEventV1 EventType = 23
// UpdateRowsEventV1 represents updated rows. It contains both old and new
// versions. Used in MySQL 5.1.15 to 5.6.
UpdateRowsEventV1 EventType = 24
// DeleteRowsEventV1 represents deleted rows. Used in MySQL 5.1.15 to 5.6.
DeleteRowsEventV1 EventType = 25
// IncidentEvent represents an incident, an occurance out of the ordinary,
// that happened on the master. The event is used to inform the slave that
// something out of the ordinary happened on the master that might cause the
// database to be in an inconsistent state.
IncidentEvent EventType = 26
// HeartbeetEvent is a replication event used to ensure to slave that master
// is alive. The event is originated by master's dump thread and sent
// straight to slave without being logged. Slave itself does not store it in
// relay log but rather uses a data for immediate checks and throws away the
// event.
HeartbeetEvent EventType = 27
// IgnorableEvent is a kind of event that could be ignored.
IgnorableEvent EventType = 28
// RowsQueryEvent is a subclass of the IgnorableEvent, to record the
// original query for the rows events in RBR.
RowsQueryEvent EventType = 29
// WriteRowsEventV2 represents inserted rows. Used starting from MySQL 5.6.
WriteRowsEventV2 EventType = 30
// UpdateRowsEventV2 represents updated rows. It contains both old and new
// versions. Used starting from MySQL 5.6.
UpdateRowsEventV2 EventType = 31
// DeleteRowsEventV2 represents deleted rows. Used starting from MySQL 5.6.
DeleteRowsEventV2 EventType = 32
// GTIDEvent is an event that contains latest GTID.
// GTID stands for Global Transaction IDentifier It is composed of two
// parts:
// * SID for Source Identifier, and
// * GNO for Group Number. The basic idea is to associate an identifier, the
// Global Transaction IDentifier or GTID, to every transaction. When a
// transaction is copied to a slave, re-executed on the slave, and written
// to the slave's binary log, the GTID is preserved. When a slave connects
// to a master, the slave uses GTIDs instead of (file, offset).
GTIDEvent EventType = 33
// AnonymousGTIDEvent is a subclass of GTIDEvent.
AnonymousGTIDEvent EventType = 34
// PreviousGTIDsEvent is a subclass of GTIDEvent.
PreviousGTIDsEvent EventType = 35
)
func (et EventType) isEither(types ...EventType) bool {
for _, t := range types {
if et == t {
return true
}
}
return false
}
func (et EventType) String() string {
switch et {
case UnknownEvent:
return "UnknownEvent"
case StartEventV3:
return "StartEventV3"
case QueryEvent:
return "QueryEvent"
case StopEvent:
return "StopEvent"
case RotateEvent:
return "RotateEvent"
case IntvarEvent:
return "IntvarEvent"
case LoadEvent:
return "LoadEvent"
case SlaveEvent:
return "SlaveEvent"
case CreateFileEvent:
return "CreateFileEvent"
case AppendBlockEvent:
return "AppendBlockEvent"
case ExecLoadEvent:
return "ExecLoadEvent"
case DeleteFileEvent:
return "DeleteFileEvent"
case NewLoadEvent:
return "NewLoadEvent"
case RandEvent:
return "RandEvent"
case UserVarEvent:
return "UserVarEvent"
case FormatDescriptionEvent:
return "FormatDescriptionEvent"
case XIDEvent:
return "XIDEvent"
case BeginLoadQueryEvent:
return "BeginLoadQueryEvent"
case ExecuteLoadQueryEvent:
return "ExecuteLoadQueryEvent"
case TableMapEvent:
return "TableMapEvent"
case WriteRowsEventV0:
return "WriteRowsEventV0"
case UpdateRowsEventV0:
return "UpdateRowsEventV0"
case DeleteRowsEventV0:
return "DeleteRowsEventV0"
case WriteRowsEventV1:
return "WriteRowsEventV1"
case UpdateRowsEventV1:
return "UpdateRowsEventV1"
case DeleteRowsEventV1:
return "DeleteRowsEventV1"
case IncidentEvent:
return "IncidentEvent"
case HeartbeetEvent:
return "HeartbeetEvent"
case IgnorableEvent:
return "IgnorableEvent"
case RowsQueryEvent:
return "RowsQueryEvent"
case WriteRowsEventV2:
return "WriteRowsEventV2"
case UpdateRowsEventV2:
return "UpdateRowsEventV2"
case DeleteRowsEventV2:
return "DeleteRowsEventV2"
case GTIDEvent:
return "GTIDEvent"
case AnonymousGTIDEvent:
return "AnonymousGTIDEvent"
case PreviousGTIDsEvent:
return "PreviousGTIDsEvent"
default:
return fmt.Sprintf("Unknown(%d)", et)
}
}
+197
View File
@@ -0,0 +1,197 @@
package parser
import (
"context"
"database/sql/driver"
"fmt"
"io"
"os"
"github.com/juju/errors"
"github.com/localhots/gobelt/log"
"github.com/localhots/mysql"
)
// Reader ...
type Reader struct {
conn *mysql.ExtendedConn
conf Config
state Position
format FormatDescription
tableMap map[uint64]TableMap
}
// Config ...
type Config struct {
ServerID uint32
File string
Offset uint32
Hostname string
}
// Position ...
type Position struct {
File string
Offset uint64
}
const (
// Bytes
resultOK byte = 0x00
resultEOF byte = 0xFE
resultERR byte = 0xFF
)
// NewReader ...
func NewReader(conn driver.Conn, conf Config) (*Reader, error) {
if conf.Hostname == "" {
name, err := os.Hostname()
if err != nil {
return nil, err
}
conf.Hostname = name
}
extconn, err := mysql.ExtendConn(conn)
if err != nil {
return nil, err
}
r := &Reader{
conn: extconn,
conf: conf,
tableMap: make(map[uint64]TableMap),
}
if err := r.disableChecksum(); err != nil {
return nil, errors.Annotate(err, "Failed to disable binlog checksum")
}
if err := r.registerSlave(); err != nil {
return nil, errors.Annotate(err, "Failed to register slave server")
}
if err := r.binlogDump(); err != nil {
return nil, errors.Annotate(err, "Failed to start binlog dump")
}
return r, nil
}
// Connect ...
func Connect(dsn string, conf Config) (*Reader, error) {
conn, err := (&mysql.MySQLDriver{}).Open(dsn)
if err != nil {
return nil, err
}
return NewReader(conn, conf)
}
// ReadEventHeader reads next event from the log and decodes its header. Header
// is then used to decode the event.
func (r *Reader) ReadEventHeader(ctx context.Context) (*EventHeader, error) {
data, err := r.conn.ReadPacket()
if err != nil {
return nil, err
}
switch data[0] {
case resultOK:
return r.parseHeader(data[1:])
case resultERR:
return nil, r.conn.HandleErrorPacket(data)
case resultEOF:
log.Debug(ctx, "EOF received")
return nil, nil
default:
log.Errorf(ctx, "Unexpected header: %x", data[0])
return nil, nil
}
}
// Spec: https://dev.mysql.com/doc/internals/en/com-register-slave.html
func (r *Reader) registerSlave() error {
const comRegisterSlave byte = 21
r.conn.ResetSequence()
buf := newCommandBuffer(1 + 4 + 1 + len(r.conf.Hostname) + 1 + 1 + 2 + 4 + 4)
buf.writeByte(comRegisterSlave)
buf.writeUint32(r.conf.ServerID)
buf.writeString(r.conf.Hostname)
// The rest of the payload would be zeroes, consider following code for
// reference:
//
// buf.writeString(username)
// buf.writeString(password)
// buf.writeUint16(port)
// buf.writeUint32(replicationRank)
// buf.writeUint32(masterID)
return r.runCmd(buf)
}
// Spec: https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
// TODO: https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
func (r *Reader) binlogDump() error {
const comBinlogDump byte = 18
r.conn.ResetSequence()
r.state.File = r.conf.File
r.state.Offset = uint64(r.conf.Offset)
// First event offset is 4
if r.state.Offset < 4 {
r.state.Offset = 4
}
buf := newCommandBuffer(1 + 4 + 2 + 4 + len(r.state.File))
buf.writeByte(comBinlogDump)
buf.writeUint32(uint32(r.state.Offset))
buf.skip(2) // Flags
buf.writeUint32(r.conf.ServerID)
buf.writeStringEOF(r.state.File)
return r.runCmd(buf)
}
func (r *Reader) runCmd(buf *buffer) error {
err := r.conn.WritePacket(buf.data)
if err != nil {
return err
}
return r.conn.ReadResultOK()
}
func (r *Reader) disableChecksum() error {
cs, err := r.getVar("BINLOG_CHECKSUM")
if err != nil {
return err
}
if cs != "NONE" {
return r.setVar("@master_binlog_checksum", "NONE")
}
return nil
}
func (r *Reader) getVar(name string) (string, error) {
rows, err := r.conn.Query(fmt.Sprintf("SHOW VARIABLES LIKE %q", name), []driver.Value{})
if err != nil {
return "", notEOF(err)
}
defer rows.Close()
res := make([]driver.Value, len(rows.Columns()))
err = rows.Next(res)
if err != nil {
return "", notEOF(err)
}
return string(res[1].([]byte)), nil
}
func (r *Reader) setVar(name, val string) error {
return r.conn.Exec(fmt.Sprintf("SET %s=%q", name, val))
}
func notEOF(err error) error {
if err == io.EOF {
return nil
}
return err
}