1
0
Fork 0

Initial commit

This commit is contained in:
Gregory Eremin 2018-07-29 19:20:30 +02:00
commit f3961d403c
14 changed files with 1297 additions and 0 deletions

18
LICENCE Normal file
View File

@ -0,0 +1,18 @@
Copyright 2018 Gregory Eremin
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

21
README.md Normal file
View File

@ -0,0 +1,21 @@
# BLT
MySQL binary log parser.
### WIP
Work in progress, some events are not fully supported.
*TODO:*
[x] FormatDescriptionEvent
[x] TableMapEvent
[x] RotateEvent
[ ] RowsEvent
[ ] XIDEvent
[ ] GTIDEvent
[ ] QueryEvent
### Licence
MIT

115
buffer.go Normal file
View File

@ -0,0 +1,115 @@
package blt
import (
"encoding/binary"
)
// buffer is a simple wrapper over a slice of bytes with a cursor. It allows for
// easy command building and results parsing.
type buffer struct {
data []byte
pos int
}
// skip next n bytes
func (b *buffer) skip(n int) {
b.pos += n
}
// advance skips next N bytes and returns them
func (b *buffer) advance(n int) []byte {
b.skip(n)
return b.data[b.pos-n:]
}
// cur returns remaining unread buffer.
func (b *buffer) cur() []byte {
return b.data[b.pos:]
}
// newReadBuffer creates a buffer with command output.
func newReadBuffer(data []byte) *buffer {
return &buffer{data: data}
}
func (b *buffer) readUint8() uint8 {
return decodeUint8(b.advance(1))
}
func (b *buffer) readUint16() uint16 {
return decodeUint16(b.advance(2))
}
func (b *buffer) readUint24() uint32 {
return decodeUint24(b.advance(3))
}
func (b *buffer) readUint32() uint32 {
return decodeUint32(b.advance(4))
}
func (b *buffer) readUint48() uint64 {
return decodeUint48(b.advance(6))
}
func (b *buffer) readUint64() uint64 {
return decodeUint64(b.advance(8))
}
func (b *buffer) readUintLenEnc() (val uint64, isNull bool) {
var size int
val, isNull, size = decodeUintLenEnc(b.cur())
b.skip(size)
return
}
func (b *buffer) readStringNullTerm() []byte {
str := decodeStringNullTerm(b.cur())
b.skip(len(str) + 1)
return str
}
func (b *buffer) readStringVarLen(n int) []byte {
return decodeStringVarLen(b.advance(n), n)
}
func (b *buffer) readStringLenEnc() []byte {
str, size := decodeStringLenEnc(b.cur())
b.skip(size)
return str
}
func (b *buffer) readStringEOF() []byte {
return decodeStringEOF(b.cur())
}
// Pre-allocate command buffer. First four bytes would be used to set command
// length and sequence number.
func newCommandBuffer(size int) *buffer {
return &buffer{data: make([]byte, size+4), pos: 4}
}
func (b *buffer) writeByte(v byte) {
b.data[b.pos] = v
b.pos++
}
func (b *buffer) writeUint16(v uint16) {
binary.LittleEndian.PutUint16(b.data[b.pos:], v)
b.pos += 2
}
func (b *buffer) writeUint32(v uint32) {
binary.LittleEndian.PutUint32(b.data[b.pos:], v)
b.pos += 4
}
func (b *buffer) writeString(s string) {
b.data[b.pos] = byte(len(s))
b.pos++
b.pos += copy(b.data[b.pos:], s)
}
func (b *buffer) writeStringEOF(s string) {
b.pos += copy(b.data[b.pos:], s)
}

59
cmd/main.go Normal file
View File

@ -0,0 +1,59 @@
package main
import (
"context"
"flag"
"fmt"
"os"
"time"
"github.com/localhots/blt"
"github.com/localhots/gobelt/log"
)
func main() {
dsn := flag.String("dsn", "", "Database source name")
id := flag.Uint("id", 1000, "Server ID (arbitrary, unique)")
file := flag.String("file", "", "Binary log file name")
offset := flag.Uint("offset", 0, "Log offset in bytes")
flag.Parse()
ctx := context.Background()
validate((*dsn != ""), "Database source name is not set")
validate((*id != 0), "Server ID is not set")
validate((*file != ""), "Binary log file is not set")
conf := blt.Config{
ServerID: uint32(*id),
File: *file,
Offset: uint32(*offset),
}
reader, err := blt.Connect(*dsn, conf)
if err != nil {
log.Fatalf(ctx, "Failed to establish connection: %v", err)
}
off := conf.Offset
for i := 0; i < 100; i++ {
// for {
evt, err := reader.ReadEventHeader(ctx)
if err != nil {
log.Fatalf(ctx, "Failed to read event: %v", err)
}
ts := time.Unix(int64(evt.Timestamp), 0).Format(time.RFC3339)
log.Info(ctx, "Event received", log.F{
"type": evt.Type,
"timestamp": ts,
"offset": off,
})
off = evt.NextOffset
}
}
func validate(cond bool, msg string) {
if !cond {
fmt.Println(msg)
flag.Usage()
os.Exit(2)
}
}

112
column_types.go Normal file
View File

@ -0,0 +1,112 @@
package blt
import (
"fmt"
)
type columnType byte
// Spec: https://dev.mysql.com/doc/internals/en/com-query-response.html#column-type
const (
colTypeDecimal columnType = 0x00
colTypeTiny columnType = 0x01
colTypeShort columnType = 0x02
colTypeLong columnType = 0x03
colTypeFloat columnType = 0x04
colTypeDouble columnType = 0x05
colTypeNull columnType = 0x06
colTypeTimestamp columnType = 0x07
colTypeLonglong columnType = 0x08
colTypeInt24 columnType = 0x09
colTypeDate columnType = 0x0a
colTypeTime columnType = 0x0b
colTypeDatetime columnType = 0x0c
colTypeYear columnType = 0x0d
colTypeNewDate columnType = 0x0e // Internal
colTypeVarchar columnType = 0x0f
colTypeBit columnType = 0x10
colTypeTimestamp2 columnType = 0x11 // Internal
colTypeDatetime2 columnType = 0x12 // Internal
colTypeTime2 columnType = 0x13 // Internal
colTypeJSON columnType = 0xF5
colTypeNewDecimal columnType = 0xF6
colTypeEnum columnType = 0xF7
colTypeSet columnType = 0xF8
colTypeTinyblob columnType = 0xF9
colTypeMediumblob columnType = 0xFA
colTypeLongblob columnType = 0xFB
colTypeBlob columnType = 0xFC
colTypeVarstring columnType = 0xFD
colTypeString columnType = 0xFE
colTypeGeometry columnType = 0xFF
)
func (ct columnType) String() string {
switch ct {
case colTypeDecimal:
return "Decimal"
case colTypeTiny:
return "Tiny"
case colTypeShort:
return "Short"
case colTypeLong:
return "Long"
case colTypeFloat:
return "Float"
case colTypeDouble:
return "Double"
case colTypeNull:
return "Null"
case colTypeTimestamp:
return "Timestamp"
case colTypeLonglong:
return "Longlong"
case colTypeInt24:
return "Int24"
case colTypeDate:
return "Date"
case colTypeTime:
return "Time"
case colTypeDatetime:
return "Datetime"
case colTypeYear:
return "Year"
case colTypeNewDate:
return "NewDate"
case colTypeVarchar:
return "Varchar"
case colTypeBit:
return "Bit"
case colTypeTimestamp2:
return "Timestamp2"
case colTypeDatetime2:
return "Datetime2"
case colTypeTime2:
return "Time2"
case colTypeJSON:
return "JSON"
case colTypeNewDecimal:
return "NewDecimal"
case colTypeEnum:
return "Enum"
case colTypeSet:
return "Set"
case colTypeTinyblob:
return "Tinyblob"
case colTypeMediumblob:
return "Mediumblob"
case colTypeLongblob:
return "Longblob"
case colTypeBlob:
return "Blob"
case colTypeVarstring:
return "Varstring"
case colTypeString:
return "String"
case colTypeGeometry:
return "Geometry"
default:
return fmt.Sprintf("Unknown(%d)", ct)
}
}

211
encoding.go Normal file
View File

@ -0,0 +1,211 @@
package blt
import (
"encoding/binary"
"fmt"
)
// Protocol::FixedLengthInteger
// A fixed-length integer stores its value in a series of bytes with the least
// significant byte first (little endian).
// Spec: https://dev.mysql.com/doc/internals/en/integer.html#fixed-length-integer
// int<1>
func encodeUint8(data []byte, v uint8) {
data[0] = v
}
func decodeUint8(data []byte) uint8 {
return uint8(data[0])
}
// int<2>
func encodeUint16(data []byte, v uint16) {
binary.LittleEndian.PutUint16(data, v)
}
func decodeUint16(data []byte) uint16 {
return binary.LittleEndian.Uint16(data)
}
// int<3>
func encodeUint24(data []byte, v uint32) {
encodeVarLen64(data, uint64(v), 3)
}
func decodeUint24(data []byte) uint32 {
return uint32(decodeVarLen64(data, 3))
}
// int<4>
func encodeUint32(data []byte, v uint32) {
binary.LittleEndian.PutUint32(data, v)
}
func decodeUint32(data []byte) uint32 {
return binary.LittleEndian.Uint32(data)
}
// int<6>
func encodeUint48(data []byte, v uint64) {
encodeVarLen64(data, v, 6)
}
func decodeUint48(data []byte) uint64 {
return decodeVarLen64(data, 6)
}
// int<8>
func encodeUint64(data []byte, v uint64) {
binary.LittleEndian.PutUint64(data, v)
}
func decodeUint64(data []byte) uint64 {
return binary.LittleEndian.Uint64(data)
}
// Protocol::LengthEncodedInteger
// An integer that consumes 1, 3, 4, or 9 bytes, depending on its numeric value.
// Spec: https://dev.mysql.com/doc/internals/en/integer.html#length-encoded-integer
// To convert a number value into a length-encoded integer:
// If the value is < 251, it is stored as a 1-byte integer.
// If the value is ≥ 251 and < (2^16), it is stored as 0xFC + 2-byte integer.
// If the value is ≥ (2^16) and < (2^24), it is stored as 0xFD + 3-byte integer.
// If the value is ≥ (2^24) and < (2^64) it is stored as 0xFE + 8-byte integer.
// Note: up to MySQL 3.22, 0xFE was followed by a 4-byte integer.
func encodeUintLenEnc(data []byte, v uint64, isNull bool) (size int) {
switch {
case isNull:
data[0] = 0xFB
return 1
case v <= 0xFB:
data[0] = byte(v)
return 1
case v <= 2<<15:
data[0] = 0xFC
encodeVarLen64(data[1:], v, 2)
return 3
case v <= 2<<23:
data[0] = 0xFD
encodeVarLen64(data[1:], v, 3)
return 4
default:
data[0] = 0xFE
encodeVarLen64(data[1:], v, 8)
return 9
}
}
// To convert a length-encoded integer into its numeric value, check the first
// byte:
// If it is < 0xFB, treat it as a 1-byte integer.
// If it is 0xFC, it is followed by a 2-byte integer.
// If it is 0xFD, it is followed by a 3-byte integer.
// If it is 0xFE, it is followed by a 8-byte integer.
// Depending on the context, the first byte may also have other meanings:
// If it is 0xFB, it is represents a NULL in a ProtocolText::ResultsetRow.
// If it is 0xFF and is the first byte of an ERR_Packet
// Caution:
// If the first byte of a packet is a length-encoded integer and its byte value
// is 0xFE, you must check the length of the packet to verify that it has enough
// space for a 8-byte integer.
// If not, it may be an EOF_Packet instead.
func decodeUintLenEnc(data []byte) (v uint64, isNull bool, size int) {
switch data[0] {
case 0xFB:
return 0xFB, true, 1
case 0xFC:
return decodeVarLen64(data[1:], 2), false, 3
case 0xFD:
return decodeVarLen64(data[1:], 3), false, 4
case 0xFE:
return decodeVarLen64(data[1:], 8), false, 9
default:
return uint64(data[0]), false, 1
}
}
//
// Variable length encoding helpers
//
func encodeVarLen64(data []byte, v uint64, s int) {
for i := 0; i < s; i++ {
data[i] = byte(v >> uint(i*8))
}
}
func decodeVarLen64(data []byte, s int) uint64 {
v := uint64(data[0])
for i := 1; i < s; i++ {
v |= uint64(data[i]) << uint(i*8)
}
return v
}
// Protocol::NulTerminatedString
// Strings that are terminated by a 0x00 byte.
func decodeStringNullTerm(data []byte) []byte {
for i, c := range data {
if c == 0x00 {
s := make([]byte, i+1)
copy(s, data[:i])
return s
}
}
s := make([]byte, len(data))
copy(s, data)
return s
}
// Protocol::VariableLengthString
// The length of the string is determined by another field or is calculated at
// runtime.
// Protocol::FixedLengthString
// Fixed-length strings have a known, hardcoded length.
func encodeStringVarLen(data, str []byte) {
copy(data, str)
}
func decodeStringVarLen(data []byte, n int) []byte {
return decodeStringEOF(data[:n])
}
// Protocol::LengthEncodedString
// A length encoded string is a string that is prefixed with length encoded
// integer describing the length of the string.
// It is a special case of Protocol::VariableLengthString
func decodeStringLenEnc(data []byte) (str []byte, size int) {
strlen, _, size := decodeUintLenEnc(data)
strleni := int(strlen)
s := make([]byte, strleni)
copy(s, data[size:size+strleni])
return s, size + strleni
}
// Protocol::RestOfPacketString
// If a string is the last component of a packet, its length can be calculated
// from the overall packet length minus the current position.
func decodeStringEOF(data []byte) []byte {
s := make([]byte, len(data))
copy(s, data)
return s
}
func trimString(str []byte) string {
fmt.Println(str, string(str))
for i, c := range str {
if c == 0x00 {
return string(str[:i])
}
}
return string(str)
}

9
encoding_test.go Normal file
View File

@ -0,0 +1,9 @@
package blt
import "testing"
func TestEncodeUint8(t *testing.T) {
buf := make([]byte, 1)
encodeUint8(buf, 123)
t.Log(buf)
}

115
event_format_description.go Normal file
View File

@ -0,0 +1,115 @@
package blt
import (
"fmt"
"strconv"
"strings"
"github.com/localhots/pretty"
)
// FormatDescription is a description of binary log format.
type FormatDescription struct {
Version uint16
ServerVersion string
CreateTimestamp uint32
EventHeaderLength uint8
EventTypeHeaderLengths []uint8
ServerDetails ServerDetails
}
// ServerDetails contains server feature details.
type ServerDetails struct {
Flavor Flavor
Version int
ChecksumAlgorithm ChecksumAlgorithm
}
// Flavor defines the specific kind of MySQL-like database.
type Flavor string
// ChecksumAlgorithm is a checksum algorithm is the one used by the server.
type ChecksumAlgorithm byte
const (
// FlavorMySQL is the MySQL db flavor.
FlavorMySQL = "MySQL"
)
const (
// ChecksumAlgorithmNone means no checksum appened.
ChecksumAlgorithmNone ChecksumAlgorithm = 0x00
// ChecksumAlgorithmCRC32 used to append a 4 byte checksum at the end.
ChecksumAlgorithmCRC32 ChecksumAlgorithm = 0x01
// ChecksumAlgorithmUndefined is used when checksum algorithm is not known.
ChecksumAlgorithmUndefined ChecksumAlgorithm = 0xFF
)
// Spec: https://dev.mysql.com/doc/internals/en/format-description-event.html
func decodeFormatDescription(data []byte) FormatDescription {
buf := newReadBuffer(data)
fd := FormatDescription{
Version: buf.readUint16(),
ServerVersion: string(trimString(buf.readStringVarLen(50))),
CreateTimestamp: buf.readUint32(),
EventHeaderLength: buf.readUint8(),
EventTypeHeaderLengths: buf.readStringEOF(),
}
pretty.Println(fd)
fd.ServerDetails = ServerDetails{
Flavor: FlavorMySQL,
Version: parseVersionNumber(fd.ServerVersion),
ChecksumAlgorithm: ChecksumAlgorithmUndefined,
}
if fd.ServerDetails.Version > 50601 {
// Last 5 bytes are:
// [1] Checksum algorithm
// [4] Checksum
fd.ServerDetails.ChecksumAlgorithm = ChecksumAlgorithm(data[len(data)-5])
fd.EventTypeHeaderLengths = fd.EventTypeHeaderLengths[:len(fd.EventTypeHeaderLengths)-5]
}
return fd
}
func (fd FormatDescription) tableIDSize(et EventType) int {
if fd.headerLen(et) == 6 {
return 4
}
return 6
}
func (fd FormatDescription) headerLen(et EventType) int {
return int(fd.EventTypeHeaderLengths[et-1])
}
func (ca ChecksumAlgorithm) String() string {
switch ca {
case ChecksumAlgorithmNone:
return "None"
case ChecksumAlgorithmCRC32:
return "CRC32"
case ChecksumAlgorithmUndefined:
return "Undefined"
default:
return fmt.Sprintf("Unknown(%d)", ca)
}
}
// parseVersionNumber turns string version into a number just like the library
// mysql_get_server_version function does.
// Example: 5.7.19-log gets represented as 50719
// Spec: https://dev.mysql.com/doc/refman/8.0/en/mysql-get-server-version.html
func parseVersionNumber(v string) int {
tokens := strings.Split(v, ".")
major, _ := strconv.Atoi(tokens[0])
minor, _ := strconv.Atoi(tokens[1])
var patch int
for i, c := range tokens[2] {
if c < '0' || c > '9' {
patch, _ = strconv.Atoi(tokens[2][:i])
break
}
}
return major*10000 + minor*100 + patch
}

92
event_header.go Normal file
View File

@ -0,0 +1,92 @@
package blt
import (
"errors"
"github.com/localhots/pretty"
)
var (
// ErrInvalidHeader is returned when event header cannot be parsed.
ErrInvalidHeader = errors.New("Header is invalid")
)
// EventHeader represents binlog event header.
type EventHeader struct {
Timestamp uint32
Type EventType
ServerID uint32
EventLen uint32
NextOffset uint32
Flags uint16
ExtraHeaders []byte
eventBody []byte
}
// Spec: https://dev.mysql.com/doc/internals/en/event-header-fields.html
func (r *Reader) parseHeader(data []byte) (*EventHeader, error) {
headerLen := r.headerLen()
if len(data) < headerLen {
return nil, ErrInvalidHeader
}
// pretty.Println(headerLen, data)
buf := newReadBuffer(data)
h := &EventHeader{
Timestamp: buf.readUint32(),
Type: EventType(buf.readUint8()),
ServerID: buf.readUint32(),
EventLen: buf.readUint32(),
}
if r.format.Version == 0 || r.format.Version >= 3 {
h.NextOffset = buf.readUint32()
h.Flags = buf.readUint16()
}
if r.format.Version >= 4 {
h.ExtraHeaders = buf.readStringVarLen(headerLen - 19)
}
h.eventBody = buf.cur()
if h.NextOffset > 0 {
r.state.Offset = uint64(h.NextOffset)
}
csa := r.format.ServerDetails.ChecksumAlgorithm
if h.Type != FormatDescriptionEvent && csa == ChecksumAlgorithmCRC32 {
h.eventBody = h.eventBody[:len(h.eventBody)-4]
}
// pretty.Println(h)
switch h.Type {
case FormatDescriptionEvent:
r.format = decodeFormatDescription(h.eventBody)
pretty.Println(h.Type.String(), r.format)
case RotateEvent:
r.state = r.decodeRotateEvent(h.eventBody)
pretty.Println(h.Type.String(), r.state)
case TableMapEvent:
tm := r.decodeTableMap(h.eventBody)
r.tableMap[tm.TableID] = tm
// pretty.Println(h.Type.String(), tm)
case WriteRowsEventV0, WriteRowsEventV1, WriteRowsEventV2,
UpdateRowsEventV0, UpdateRowsEventV1, UpdateRowsEventV2,
DeleteRowsEventV0, DeleteRowsEventV1, DeleteRowsEventV2:
r.decodeRowsEvent(h.eventBody, h.Type)
case XIDEvent, GTIDEvent:
// TODO: Add support for these too
case QueryEvent:
// TODO: Handle schema changes
}
return h, nil
}
func (r *Reader) headerLen() int {
const defaultHeaderLength = 19
if r.format.EventHeaderLength > 0 {
return int(r.format.EventHeaderLength)
}
return defaultHeaderLength
}

13
event_rotate.go Normal file
View File

@ -0,0 +1,13 @@
package blt
func (r *Reader) decodeRotateEvent(data []byte) Position {
buf := newReadBuffer(data)
var p Position
if r.format.Version > 1 {
p.Offset = buf.readUint64()
} else {
p.Offset = 4
}
p.File = string(buf.readStringEOF())
return p
}

64
event_rows.go Normal file
View File

@ -0,0 +1,64 @@
package blt
import (
"fmt"
"github.com/localhots/pretty"
)
// Rows contains a Rows Event.
type Rows struct {
EventType EventType
TableID uint64
Flags uint16
ExtraData []byte
ColumnCount uint64
ColumnBitmap1 []byte
ColumnBitmap2 []byte
Rows [][]interface{}
}
type rowsFlag uint16
const (
rowsFlagEndOfStatement rowsFlag = 0x0001
rowsFlagNoForeignKeyChecks rowsFlag = 0x0002
rowsFlagNoUniqueKeyChecks rowsFlag = 0x0004
rowsFlagRowHasColumns rowsFlag = 0x0008
freeTableMapID = 0x00FFFFFF
)
func (r *Reader) decodeRowsEvent(data []byte, typ EventType) {
// pretty.Println(data)
buf := newReadBuffer(data)
rows := Rows{EventType: typ}
idSize := r.format.tableIDSize(typ)
if idSize == 6 {
rows.TableID = buf.readUint48()
} else {
rows.TableID = uint64(buf.readUint32())
}
rows.Flags = buf.readUint16()
if typ.isEither(WriteRowsEventV2, UpdateRowsEventV2, DeleteRowsEventV2) {
// Extra data length is part of extra data, deduct 2 bytes as they
// already store its length
extraLen := buf.readUint16() - 2
rows.ExtraData = buf.readStringVarLen(int(extraLen))
}
rows.ColumnCount, _ = buf.readUintLenEnc()
rows.ColumnBitmap1 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8)
if typ.isEither(UpdateRowsEventV2, UpdateRowsEventV1) {
rows.ColumnBitmap2 = buf.readStringVarLen(int(rows.ColumnCount+7) / 8)
}
tm, ok := r.tableMap[rows.TableID]
if !ok {
panic(fmt.Errorf("Out of sync: no table map definition for ID=%d", rows.TableID))
}
pretty.Println(typ.String(), rows, tm, buf.cur())
}

59
event_table_map.go Normal file
View File

@ -0,0 +1,59 @@
package blt
// TableMap ...
type TableMap struct {
TableID uint64
Flags uint16
SchemaName string
TableName string
ColumnCount uint64
ColumnTypes []byte
ColumnMeta []uint16
NullBitmask []byte
}
// Spec: https://dev.mysql.com/doc/internals/en/table-map-event.html
func (r *Reader) decodeTableMap(data []byte) TableMap {
buf := newReadBuffer(data)
var tm TableMap
idSize := r.format.tableIDSize(TableMapEvent)
if idSize == 6 {
tm.TableID = buf.readUint48()
} else {
tm.TableID = uint64(buf.readUint32())
}
tm.Flags = buf.readUint16()
tm.SchemaName = string(buf.readStringLenEnc())
buf.skip(1) // Always 0x00
tm.TableName = string(buf.readStringLenEnc())
buf.skip(1) // Always 0x00
tm.ColumnCount, _ = buf.readUintLenEnc()
tm.ColumnTypes = buf.readStringVarLen(int(tm.ColumnCount))
tm.ColumnMeta = decodeColumnMeta(buf.readStringLenEnc(), tm.ColumnTypes)
tm.NullBitmask = buf.readStringVarLen(int(tm.ColumnCount+8) / 7)
return tm
}
func decodeColumnMeta(data []byte, cols []byte) []uint16 {
pos := 0
meta := make([]uint16, len(cols))
for i, typ := range cols {
switch columnType(typ) {
case colTypeString, colTypeNewDecimal:
// TODO: Is that correct?
meta[i] = uint16(data[pos])<<8 | uint16(data[pos+1])
pos += 2
case colTypeVarchar, colTypeVarstring, colTypeBit:
// TODO: Is that correct?
meta[i] = decodeUint16(data[pos:])
pos += 2
case colTypeFloat, colTypeDouble, colTypeBlob, colTypeGeometry, colTypeJSON,
colTypeTime2, colTypeDatetime2, colTypeTimestamp2:
meta[i] = uint16(data[pos])
pos++
}
}
return meta
}

212
event_types.go Normal file
View File

@ -0,0 +1,212 @@
package blt
import (
"fmt"
)
// EventType defines a binary log event type.
type EventType byte
// Spec: https://dev.mysql.com/doc/internals/en/event-classes-and-types.html
const (
// UnknownEvent is an event that should never occur.
UnknownEvent EventType = 0
// StartEventV3 is the Start_event of binlog format 3.
StartEventV3 EventType = 1
// QueryEvent is created for each query that modifies the database, unless
// the query is logged row-based.
QueryEvent EventType = 2
// StopEvent is written to the log files under these circumstances:
// A master writes the event to the binary log when it shuts down.
// A slave writes the event to the relay log when it shuts down or when a
// RESET SLAVE statement is executed.
StopEvent EventType = 3
// RotateEvent is written at the end of the file that points to the next
// file in the squence. It is written when a binary log file exceeds a size
// limit.
RotateEvent EventType = 4
// IntvarEvent will be created just before a Query_event, if the query uses
// one of the variables LAST_INSERT_ID or INSERT_ID.
IntvarEvent EventType = 5
// LoadEvent ...
LoadEvent EventType = 6
// SlaveEvent ...
SlaveEvent EventType = 7
// CreateFileEvent ...
CreateFileEvent EventType = 8
// AppendBlockEvent is created to contain the file data.
AppendBlockEvent EventType = 9
// ExecLoadEvent ...
ExecLoadEvent EventType = 10
// DeleteFileEvent occurs when the LOAD DATA failed on the master.
// This event notifies the slave not to do the load and to delete the
// temporary file.
DeleteFileEvent EventType = 11
// NewLoadEvent ...
NewLoadEvent EventType = 12
// RandEvent logs random seed used by the next RAND(), and by PASSWORD()
// in 4.1.0.
RandEvent EventType = 13
// UserVarEvent is written every time a statement uses a user variable;
// precedes other events for the statement. Indicates the value to use for
// the user variable in the next statement. This is written only before a
// QUERY_EVENT and is not used with row-based logging.
UserVarEvent EventType = 14
// FormatDescriptionEvent is saved by threads which read it, as they need it
// for future use (to decode the ordinary events).
FormatDescriptionEvent EventType = 15
// XIDEvent is generated for a commit of a transaction that modifies one or
// more tables of an XA-capable storage engine.
XIDEvent EventType = 16
// BeginLoadQueryEvent is for the first block of file to be loaded, its only
// difference from Append_block event is that this event creates or
// truncates existing file before writing data.
BeginLoadQueryEvent EventType = 17
// ExecuteLoadQueryEvent is responsible for LOAD DATA execution, it similar
// to Query_event but before executing the query it substitutes original
// filename in LOAD DATA query with name of temporary file.
ExecuteLoadQueryEvent EventType = 18
// TableMapEvent is used in row-based mode where it preceeds every row
// operation event and maps a table definition to a number. The table
// definition consists of database name, table name, and column definitions.
TableMapEvent EventType = 19
// WriteRowsEventV0 represents inserted rows. Used in MySQL 5.1.0 to 5.1.15.
WriteRowsEventV0 EventType = 20
// UpdateRowsEventV0 represents updated rows. It contains both old and new
// versions. Used in MySQL 5.1.0 to 5.1.15.
UpdateRowsEventV0 EventType = 21
// DeleteRowsEventV0 represents deleted rows. Used in MySQL 5.1.0 to 5.1.15.
DeleteRowsEventV0 EventType = 22
// WriteRowsEventV1 represents inserted rows. Used in MySQL 5.1.15 to 5.6.
WriteRowsEventV1 EventType = 23
// UpdateRowsEventV1 represents updated rows. It contains both old and new
// versions. Used in MySQL 5.1.15 to 5.6.
UpdateRowsEventV1 EventType = 24
// DeleteRowsEventV1 represents deleted rows. Used in MySQL 5.1.15 to 5.6.
DeleteRowsEventV1 EventType = 25
// IncidentEvent represents an incident, an occurance out of the ordinary,
// that happened on the master. The event is used to inform the slave that
// something out of the ordinary happened on the master that might cause the
// database to be in an inconsistent state.
IncidentEvent EventType = 26
// HeartbeetEvent is a replication event used to ensure to slave that master
// is alive. The event is originated by master's dump thread and sent
// straight to slave without being logged. Slave itself does not store it in
// relay log but rather uses a data for immediate checks and throws away the
// event.
HeartbeetEvent EventType = 27
// IgnorableEvent is a kind of event that could be ignored.
IgnorableEvent EventType = 28
// RowsQueryEvent is a subclass of the IgnorableEvent, to record the
// original query for the rows events in RBR.
RowsQueryEvent EventType = 29
// WriteRowsEventV2 represents inserted rows. Used starting from MySQL 5.6.
WriteRowsEventV2 EventType = 30
// UpdateRowsEventV2 represents updated rows. It contains both old and new
// versions. Used starting from MySQL 5.6.
UpdateRowsEventV2 EventType = 31
// DeleteRowsEventV2 represents deleted rows. Used starting from MySQL 5.6.
DeleteRowsEventV2 EventType = 32
// GTIDEvent is an event that contains latest GTID.
// GTID stands for Global Transaction IDentifier It is composed of two
// parts:
// * SID for Source Identifier, and
// * GNO for Group Number. The basic idea is to associate an identifier, the
// Global Transaction IDentifier or GTID, to every transaction. When a
// transaction is copied to a slave, re-executed on the slave, and written
// to the slave's binary log, the GTID is preserved. When a slave connects
// to a master, the slave uses GTIDs instead of (file, offset).
GTIDEvent EventType = 33
// AnonymousGTIDEvent is a subclass of GTIDEvent.
AnonymousGTIDEvent EventType = 34
// PreviousGTIDsEvent is a subclass of GTIDEvent.
PreviousGTIDsEvent EventType = 35
)
func (et EventType) isEither(types ...EventType) bool {
for _, t := range types {
if et == t {
return true
}
}
return false
}
func (et EventType) String() string {
switch et {
case UnknownEvent:
return "UnknownEvent"
case StartEventV3:
return "StartEventV3"
case QueryEvent:
return "QueryEvent"
case StopEvent:
return "StopEvent"
case RotateEvent:
return "RotateEvent"
case IntvarEvent:
return "IntvarEvent"
case LoadEvent:
return "LoadEvent"
case SlaveEvent:
return "SlaveEvent"
case CreateFileEvent:
return "CreateFileEvent"
case AppendBlockEvent:
return "AppendBlockEvent"
case ExecLoadEvent:
return "ExecLoadEvent"
case DeleteFileEvent:
return "DeleteFileEvent"
case NewLoadEvent:
return "NewLoadEvent"
case RandEvent:
return "RandEvent"
case UserVarEvent:
return "UserVarEvent"
case FormatDescriptionEvent:
return "FormatDescriptionEvent"
case XIDEvent:
return "XIDEvent"
case BeginLoadQueryEvent:
return "BeginLoadQueryEvent"
case ExecuteLoadQueryEvent:
return "ExecuteLoadQueryEvent"
case TableMapEvent:
return "TableMapEvent"
case WriteRowsEventV0:
return "WriteRowsEventV0"
case UpdateRowsEventV0:
return "UpdateRowsEventV0"
case DeleteRowsEventV0:
return "DeleteRowsEventV0"
case WriteRowsEventV1:
return "WriteRowsEventV1"
case UpdateRowsEventV1:
return "UpdateRowsEventV1"
case DeleteRowsEventV1:
return "DeleteRowsEventV1"
case IncidentEvent:
return "IncidentEvent"
case HeartbeetEvent:
return "HeartbeetEvent"
case IgnorableEvent:
return "IgnorableEvent"
case RowsQueryEvent:
return "RowsQueryEvent"
case WriteRowsEventV2:
return "WriteRowsEventV2"
case UpdateRowsEventV2:
return "UpdateRowsEventV2"
case DeleteRowsEventV2:
return "DeleteRowsEventV2"
case GTIDEvent:
return "GTIDEvent"
case AnonymousGTIDEvent:
return "AnonymousGTIDEvent"
case PreviousGTIDsEvent:
return "PreviousGTIDsEvent"
default:
return fmt.Sprintf("Unknown(%d)", et)
}
}

197
reader.go Normal file
View File

@ -0,0 +1,197 @@
package blt
import (
"context"
"database/sql/driver"
"fmt"
"io"
"os"
"github.com/juju/errors"
"github.com/localhots/gobelt/log"
"github.com/localhots/mysql"
)
// Reader ...
type Reader struct {
conn *mysql.ExtendedConn
conf Config
state Position
format FormatDescription
tableMap map[uint64]TableMap
}
// Config ...
type Config struct {
ServerID uint32
File string
Offset uint32
Hostname string
}
// Position ...
type Position struct {
File string
Offset uint64
}
const (
// Bytes
resultOK byte = 0x00
resultEOF byte = 0xFE
resultERR byte = 0xFF
)
// NewReader ...
func NewReader(conn driver.Conn, conf Config) (*Reader, error) {
if conf.Hostname == "" {
name, err := os.Hostname()
if err != nil {
return nil, err
}
conf.Hostname = name
}
extconn, err := mysql.ExtendConn(conn)
if err != nil {
return nil, err
}
r := &Reader{
conn: extconn,
conf: conf,
tableMap: make(map[uint64]TableMap),
}
if err := r.disableChecksum(); err != nil {
return nil, errors.Annotate(err, "Failed to disable binlog checksum")
}
if err := r.registerSlave(); err != nil {
return nil, errors.Annotate(err, "Failed to register slave server")
}
if err := r.binlogDump(); err != nil {
return nil, errors.Annotate(err, "Failed to start binlog dump")
}
return r, nil
}
// Connect ...
func Connect(dsn string, conf Config) (*Reader, error) {
conn, err := (&mysql.MySQLDriver{}).Open(dsn)
if err != nil {
return nil, err
}
return NewReader(conn, conf)
}
// ReadEventHeader reads next event from the log and decodes its header. Header
// is then used to decode the event.
func (r *Reader) ReadEventHeader(ctx context.Context) (*EventHeader, error) {
data, err := r.conn.ReadPacket()
if err != nil {
return nil, err
}
switch data[0] {
case resultOK:
return r.parseHeader(data[1:])
case resultERR:
return nil, r.conn.HandleErrorPacket(data)
case resultEOF:
log.Debug(ctx, "EOF received")
return nil, nil
default:
log.Errorf(ctx, "Unexpected header: %x", data[0])
return nil, nil
}
}
// Spec: https://dev.mysql.com/doc/internals/en/com-register-slave.html
func (r *Reader) registerSlave() error {
const comRegisterSlave byte = 21
r.conn.ResetSequence()
buf := newCommandBuffer(1 + 4 + 1 + len(r.conf.Hostname) + 1 + 1 + 2 + 4 + 4)
buf.writeByte(comRegisterSlave)
buf.writeUint32(r.conf.ServerID)
buf.writeString(r.conf.Hostname)
// The rest of the payload would be zeroes, consider following code for
// reference:
//
// buf.writeString(username)
// buf.writeString(password)
// buf.writeUint16(port)
// buf.writeUint32(replicationRank)
// buf.writeUint32(masterID)
return r.runCmd(buf)
}
// Spec: https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
// TODO: https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
func (r *Reader) binlogDump() error {
const comBinlogDump byte = 18
r.conn.ResetSequence()
r.state.File = r.conf.File
r.state.Offset = uint64(r.conf.Offset)
// First event offset is 4
if r.state.Offset < 4 {
r.state.Offset = 4
}
buf := newCommandBuffer(1 + 4 + 2 + 4 + len(r.state.File))
buf.writeByte(comBinlogDump)
buf.writeUint32(uint32(r.state.Offset))
buf.skip(2) // Flags
buf.writeUint32(r.conf.ServerID)
buf.writeStringEOF(r.state.File)
return r.runCmd(buf)
}
func (r *Reader) runCmd(buf *buffer) error {
err := r.conn.WritePacket(buf.data)
if err != nil {
return err
}
return r.conn.ReadResultOK()
}
func (r *Reader) disableChecksum() error {
cs, err := r.getVar("BINLOG_CHECKSUM")
if err != nil {
return err
}
if cs != "NONE" {
return r.setVar("@master_binlog_checksum", "NONE")
}
return nil
}
func (r *Reader) getVar(name string) (string, error) {
rows, err := r.conn.Query(fmt.Sprintf("SHOW VARIABLES LIKE %q", name), []driver.Value{})
if err != nil {
return "", notEOF(err)
}
defer rows.Close()
res := make([]driver.Value, len(rows.Columns()))
err = rows.Next(res)
if err != nil {
return "", notEOF(err)
}
return string(res[1].([]byte)), nil
}
func (r *Reader) setVar(name, val string) error {
return r.conn.Exec(fmt.Sprintf("SET %s=%q", name, val))
}
func notEOF(err error) error {
if err == io.EOF {
return nil
}
return err
}