Major refactoring

The thing still works somehow
This commit is contained in:
2018-11-06 22:51:56 +01:00
parent c4bbbc439f
commit 5ef0642499
21 changed files with 1236 additions and 1042 deletions
+117
View File
@@ -0,0 +1,117 @@
package reader
import (
"github.com/juju/errors"
"github.com/localhots/blt/binlog"
"github.com/localhots/blt/schema"
"github.com/localhots/pretty"
)
// Reader ...
type Reader struct {
conn *SlaveConn
state binlog.Position
format binlog.FormatDescription
tableMap map[uint64]binlog.TableDescription
schema *schema.Schema
}
// Event ...
type Event struct {
Header binlog.EventHeader
Body []byte
}
// NewReader ...
func NewReader(conn *SlaveConn) (*Reader, error) {
r := &Reader{
conn: conn,
tableMap: make(map[uint64]binlog.TableDescription),
}
if err := conn.DisableChecksum(); err != nil {
return nil, errors.Annotate(err, "disable binlog checksum")
}
if err := conn.RegisterSlave(); err != nil {
return nil, errors.Annotate(err, "register slave server")
}
if err := conn.StartBinlogDump(); err != nil {
return nil, errors.Annotate(err, "start binlog dump")
}
return r, nil
}
// ReadEvent ...
func (r *Reader) ReadEvent() (*Event, error) {
connBuff, err := r.conn.ReadPacket()
if err != nil {
return nil, err
}
var evt Event
if err := evt.Header.Decode(connBuff, r.format); err != nil {
return nil, errors.Annotate(err, "decode event header")
}
if evt.Header.NextOffset > 0 {
r.state.Offset = uint64(evt.Header.NextOffset)
}
evt.Body = connBuff[r.format.HeaderLen():]
csa := r.format.ServerDetails.ChecksumAlgorithm
if evt.Header.Type != binlog.EventTypeFormatDescription && csa == binlog.ChecksumAlgorithmCRC32 {
evt.Body = evt.Body[:len(evt.Body)-4]
}
// pretty.Println(h)
switch evt.Header.Type {
case binlog.EventTypeFormatDescription:
var fde binlog.FormatDescriptionEvent
if err = fde.Decode(evt.Body); err == nil {
r.format = fde.FormatDescription
}
pretty.Println(evt.Header.Type.String(), r.format)
case binlog.EventTypeRotate:
var re binlog.RotateEvent
if err = re.Decode(evt.Body, r.format); err == nil {
r.state = re.NextFile
}
pretty.Println(evt.Header.Type.String(), r.state)
case binlog.EventTypeTableMap:
var tme binlog.TableMapEvent
if err = tme.Decode(evt.Body, r.format); err == nil {
r.tableMap[tme.TableID] = tme.TableDescription
}
// pretty.Println(evt.Header.Type.String(), tm)
case binlog.EventTypeWriteRowsV0,
binlog.EventTypeWriteRowsV1,
binlog.EventTypeWriteRowsV2,
binlog.EventTypeUpdateRowsV0,
binlog.EventTypeUpdateRowsV1,
binlog.EventTypeUpdateRowsV2,
binlog.EventTypeDeleteRowsV0,
binlog.EventTypeDeleteRowsV1,
binlog.EventTypeDeleteRowsV2:
re := binlog.RowsEvent{Type: evt.Header.Type}
tableID := re.PeekTableID(evt.Body, r.format)
td, ok := r.tableMap[tableID]
if !ok {
return nil, errors.New("Unknown table ID")
}
if err = re.Decode(evt.Body, r.format, td); err == nil {
pretty.Println(re)
}
case binlog.EventTypeXID:
// TODO: Add support
case binlog.EventTypeGTID:
// TODO: Add support
case binlog.EventTypeQuery:
// TODO: Handle schema changes
}
return &evt, err
}
+172
View File
@@ -0,0 +1,172 @@
package reader
import (
"database/sql/driver"
"encoding/hex"
"fmt"
"io"
"os"
"github.com/localhots/blt/tools"
"github.com/localhots/mysql"
)
// SlaveConn ...
type SlaveConn struct {
conn *mysql.ExtendedConn
conf Config
}
// Config ...
type Config struct {
ServerID uint32
File string
Offset uint32
Hostname string
}
const (
// Commands
comRegisterSlave byte = 21
comBinlogDump byte = 18
// Bytes
resultOK byte = 0x00
resultEOF byte = 0xFE
resultERR byte = 0xFF
)
// Connect ...
func Connect(dsn string, conf Config) (*SlaveConn, error) {
if conf.Hostname == "" {
name, err := os.Hostname()
if err != nil {
return nil, err
}
conf.Hostname = name
}
conf.Hostname = "localhost"
if conf.Offset == 0 {
conf.Offset = 4
}
conn, err := (&mysql.MySQLDriver{}).Open(dsn)
if err != nil {
return nil, err
}
extconn, err := mysql.ExtendConn(conn)
if err != nil {
return nil, err
}
return &SlaveConn{conn: extconn, conf: conf}, nil
}
// ReadPacket reads next packet from the server and processes the first status
// byte.
func (c *SlaveConn) ReadPacket() ([]byte, error) {
data, err := c.conn.ReadPacket()
if err != nil {
return nil, err
}
switch data[0] {
case resultOK:
return data[1:], nil
case resultERR:
return nil, c.conn.HandleErrorPacket(data)
case resultEOF:
return nil, nil
default:
return nil, fmt.Errorf("unexpected header: %x", data[0])
}
}
// RegisterSlave issues a REGISTER_SLAVE command to master.
// Spec: https://dev.mysql.com/doc/internals/en/com-register-slave.html
func (c *SlaveConn) RegisterSlave() error {
c.conn.ResetSequence()
buf := tools.NewCommandBuffer(1 + 4 + 1 + len(c.conf.Hostname) + 1 + 1 + 2 + 4 + 4)
buf.WriteByte(comRegisterSlave)
buf.WriteUint32(c.conf.ServerID)
buf.WriteStringLenEnc(c.conf.Hostname)
// The rest of the payload would be zeroes, consider following code for
// reference:
//
// buf.WriteStringLenEnc(username)
// buf.WriteStringLenEnc(password)
// buf.WriteUint16(port)
// buf.WriteUint32(replicationRank)
// buf.WriteUint32(masterID)
fmt.Println(hex.Dump(buf.Bytes()))
return c.runCmd(buf.Bytes())
}
// StartBinlogDump issues a BINLOG_DUMP command to master.
// Spec: https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
// TODO: https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
func (c *SlaveConn) StartBinlogDump() error {
c.conn.ResetSequence()
buf := tools.NewCommandBuffer(1 + 4 + 2 + 4 + len(c.conf.File))
buf.WriteByte(comBinlogDump)
buf.WriteUint32(uint32(c.conf.Offset))
buf.Skip(2) // Flags
buf.WriteUint32(c.conf.ServerID)
buf.WriteStringEOF(c.conf.File)
return c.runCmd(buf.Bytes())
}
// DisableChecksum disables CRC32 checksums for this connection.
func (c *SlaveConn) DisableChecksum() error {
cs, err := c.GetVar("BINLOG_CHECKSUM")
if err != nil {
return err
}
if cs != "NONE" {
return c.SetVar("@master_binlog_checksum", "NONE")
}
return nil
}
// GetVar fetches value of the given variable.
func (c *SlaveConn) GetVar(name string) (string, error) {
rows, err := c.conn.Query(fmt.Sprintf("SHOW VARIABLES LIKE %q", name), []driver.Value{})
if err != nil {
return "", notEOF(err)
}
defer rows.Close()
res := make([]driver.Value, len(rows.Columns()))
err = rows.Next(res)
if err != nil {
return "", notEOF(err)
}
return string(res[1].([]byte)), nil
}
// SetVar assigns a new value to the given variable.
func (c *SlaveConn) SetVar(name, val string) error {
return c.conn.Exec(fmt.Sprintf("SET %s=%q", name, val))
}
func (c *SlaveConn) runCmd(data []byte) error {
err := c.conn.WritePacket(data)
if err != nil {
return err
}
return c.conn.ReadResultOK()
}
func notEOF(err error) error {
if err == io.EOF {
return nil
}
return err
}