1
0
Fork 0

Decouple slave connection and reader

This commit is contained in:
Gregory Eremin 2018-11-11 15:26:40 +01:00
parent 8c8705581e
commit 6f61469eea
4 changed files with 35 additions and 39 deletions

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/localhots/bocadillo/reader"
"github.com/localhots/bocadillo/reader/slave"
)
func main() {
@ -20,23 +21,16 @@ func main() {
validate((*dsn != ""), "Database source name is not set")
validate((*id != 0), "Server ID is not set")
validate((*file != ""), "Binary log file is not set")
conf := reader.Config{
reader, err := reader.New(*dsn, slave.Config{
ServerID: uint32(*id),
File: *file,
Offset: uint32(*offset),
}
conn, err := reader.Connect(*dsn, conf)
if err != nil {
log.Fatalf("Failed to establish connection: %v", err)
}
reader, err := reader.NewReader(conn)
})
if err != nil {
log.Fatalf("Failed to create reader: %v", err)
}
off := conf.Offset
// for i := 0; i < 100; i++ {
for {
evt, err := reader.ReadEvent()
@ -44,8 +38,7 @@ func main() {
log.Fatalf("Failed to read event: %v", err)
}
ts := time.Unix(int64(evt.Header.Timestamp), 0).Format(time.RFC3339)
log.Printf("Event received: %s %s, %d\n", evt.Header.Type.String(), ts, off)
off = evt.Header.NextOffset
log.Printf("Event received: %s %s, %d\n", evt.Header.Type.String(), ts, evt.Header.NextOffset)
if evt.Table != nil {
_, err := evt.DecodeRows()

View File

@ -4,18 +4,19 @@ import (
"github.com/juju/errors"
"github.com/localhots/bocadillo/binlog"
"github.com/localhots/bocadillo/reader/schema"
"github.com/localhots/bocadillo/reader/slave"
)
// Reader ...
// Reader is a binary log reader.
type Reader struct {
conn *SlaveConn
conn *slave.Conn
state binlog.Position
format binlog.FormatDescription
tableMap map[uint64]binlog.TableDescription
schema *schema.Schema
}
// Event ...
// Event contains binlog event details.
type Event struct {
Format binlog.FormatDescription
Header binlog.EventHeader
@ -25,8 +26,13 @@ type Event struct {
Table *binlog.TableDescription
}
// NewReader ...
func NewReader(conn *SlaveConn) (*Reader, error) {
// New creates a new binary log reader.
func New(dsn string, sc slave.Config) (*Reader, error) {
conn, err := slave.Connect(dsn, sc)
if err != nil {
return nil, errors.Annotate(err, "establish slave connection")
}
r := &Reader{
conn: conn,
tableMap: make(map[uint64]binlog.TableDescription),
@ -45,7 +51,7 @@ func NewReader(conn *SlaveConn) (*Reader, error) {
return r, nil
}
// ReadEvent ...
// ReadEvent reads next event from the binary log.
func (r *Reader) ReadEvent() (*Event, error) {
connBuff, err := r.conn.ReadPacket()
if err != nil {

View File

@ -1,4 +1,4 @@
package reader
package slave
import (
"database/sql/driver"
@ -10,13 +10,14 @@ import (
"github.com/localhots/mysql"
)
// SlaveConn ...
type SlaveConn struct {
// Conn is a slave connection used to issue a binlog dump command.
type Conn struct {
conn *mysql.ExtendedConn
conf Config
}
// Config ...
// Config contains slave connection configuration. It is passed to master upon
// registration.
type Config struct {
ServerID uint32
File string
@ -35,8 +36,8 @@ const (
resultERR byte = 0xFF
)
// Connect ...
func Connect(dsn string, conf Config) (*SlaveConn, error) {
// Connect esablishes a new slave connection.
func Connect(dsn string, conf Config) (*Conn, error) {
if conf.Hostname == "" {
name, err := os.Hostname()
if err != nil {
@ -59,12 +60,12 @@ func Connect(dsn string, conf Config) (*SlaveConn, error) {
return nil, err
}
return &SlaveConn{conn: extconn, conf: conf}, nil
return &Conn{conn: extconn, conf: conf}, nil
}
// ReadPacket reads next packet from the server and processes the first status
// byte.
func (c *SlaveConn) ReadPacket() ([]byte, error) {
func (c *Conn) ReadPacket() ([]byte, error) {
data, err := c.conn.ReadPacket()
if err != nil {
return nil, err
@ -84,7 +85,7 @@ func (c *SlaveConn) ReadPacket() ([]byte, error) {
// RegisterSlave issues a REGISTER_SLAVE command to master.
// Spec: https://dev.mysql.com/doc/internals/en/com-register-slave.html
func (c *SlaveConn) RegisterSlave() error {
func (c *Conn) RegisterSlave() error {
c.conn.ResetSequence()
buf := tools.NewCommandBuffer(1 + 4 + 1 + len(c.conf.Hostname) + 1 + 1 + 2 + 4 + 4)
@ -106,7 +107,7 @@ func (c *SlaveConn) RegisterSlave() error {
// StartBinlogDump issues a BINLOG_DUMP command to master.
// Spec: https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
// TODO: https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
func (c *SlaveConn) StartBinlogDump() error {
func (c *Conn) StartBinlogDump() error {
c.conn.ResetSequence()
buf := tools.NewCommandBuffer(1 + 4 + 2 + 4 + len(c.conf.File))
@ -120,7 +121,7 @@ func (c *SlaveConn) StartBinlogDump() error {
}
// DisableChecksum disables CRC32 checksums for this connection.
func (c *SlaveConn) DisableChecksum() error {
func (c *Conn) DisableChecksum() error {
cs, err := c.GetVar("BINLOG_CHECKSUM")
if err != nil {
return err
@ -133,7 +134,7 @@ func (c *SlaveConn) DisableChecksum() error {
}
// GetVar fetches value of the given variable.
func (c *SlaveConn) GetVar(name string) (string, error) {
func (c *Conn) GetVar(name string) (string, error) {
rows, err := c.conn.Query(fmt.Sprintf("SHOW VARIABLES LIKE %q", name), []driver.Value{})
if err != nil {
return "", notEOF(err)
@ -150,11 +151,11 @@ func (c *SlaveConn) GetVar(name string) (string, error) {
}
// SetVar assigns a new value to the given variable.
func (c *SlaveConn) SetVar(name, val string) error {
func (c *Conn) SetVar(name, val string) error {
return c.conn.Exec(fmt.Sprintf("SET %s=%q", name, val))
}
func (c *SlaveConn) runCmd(data []byte) error {
func (c *Conn) runCmd(data []byte) error {
err := c.conn.WritePacket(data)
if err != nil {
return err

View File

@ -11,6 +11,7 @@ import (
"github.com/localhots/bocadillo/binlog"
"github.com/localhots/bocadillo/reader"
"github.com/localhots/bocadillo/reader/slave"
)
var suite *testSuite
@ -30,12 +31,7 @@ func TestMain(m *testing.M) {
conf.Offset = uint32(pos.Offset)
}
slaveConn, err := reader.Connect(dsn, conf)
if err != nil {
log.Fatal(err)
}
rdr, err := reader.NewReader(slaveConn)
rdr, err := reader.New(dsn, conf)
if err != nil {
log.Fatal(err)
}
@ -50,7 +46,7 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}
func getConfig() (dsn string, conf reader.Config) {
func getConfig() (dsn string, conf slave.Config) {
envOrDefault := func(name, def string) string {
if val := os.Getenv(name); val != "" {
return val