From 6f61469eeaf3f9518952648514bb6dd4179af7c6 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Sun, 11 Nov 2018 15:26:40 +0100 Subject: [PATCH] Decouple slave connection and reader --- cmd/main.go | 17 +++++------------ reader/reader.go | 18 ++++++++++++------ reader/{ => slave}/slave_conn.go | 29 +++++++++++++++-------------- tests/integration_test.go | 10 +++------- 4 files changed, 35 insertions(+), 39 deletions(-) rename reader/{ => slave}/slave_conn.go (82%) diff --git a/cmd/main.go b/cmd/main.go index bc3b3fe..f838676 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,6 +8,7 @@ import ( "time" "github.com/localhots/bocadillo/reader" + "github.com/localhots/bocadillo/reader/slave" ) func main() { @@ -20,23 +21,16 @@ func main() { validate((*dsn != ""), "Database source name is not set") validate((*id != 0), "Server ID is not set") validate((*file != ""), "Binary log file is not set") - conf := reader.Config{ + + reader, err := reader.New(*dsn, slave.Config{ ServerID: uint32(*id), File: *file, Offset: uint32(*offset), - } - - conn, err := reader.Connect(*dsn, conf) - if err != nil { - log.Fatalf("Failed to establish connection: %v", err) - } - - reader, err := reader.NewReader(conn) + }) if err != nil { log.Fatalf("Failed to create reader: %v", err) } - off := conf.Offset // for i := 0; i < 100; i++ { for { evt, err := reader.ReadEvent() @@ -44,8 +38,7 @@ func main() { log.Fatalf("Failed to read event: %v", err) } ts := time.Unix(int64(evt.Header.Timestamp), 0).Format(time.RFC3339) - log.Printf("Event received: %s %s, %d\n", evt.Header.Type.String(), ts, off) - off = evt.Header.NextOffset + log.Printf("Event received: %s %s, %d\n", evt.Header.Type.String(), ts, evt.Header.NextOffset) if evt.Table != nil { _, err := evt.DecodeRows() diff --git a/reader/reader.go b/reader/reader.go index 2838576..9a52694 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -4,18 +4,19 @@ import ( "github.com/juju/errors" "github.com/localhots/bocadillo/binlog" "github.com/localhots/bocadillo/reader/schema" + "github.com/localhots/bocadillo/reader/slave" ) -// Reader ... +// Reader is a binary log reader. type Reader struct { - conn *SlaveConn + conn *slave.Conn state binlog.Position format binlog.FormatDescription tableMap map[uint64]binlog.TableDescription schema *schema.Schema } -// Event ... +// Event contains binlog event details. type Event struct { Format binlog.FormatDescription Header binlog.EventHeader @@ -25,8 +26,13 @@ type Event struct { Table *binlog.TableDescription } -// NewReader ... -func NewReader(conn *SlaveConn) (*Reader, error) { +// New creates a new binary log reader. +func New(dsn string, sc slave.Config) (*Reader, error) { + conn, err := slave.Connect(dsn, sc) + if err != nil { + return nil, errors.Annotate(err, "establish slave connection") + } + r := &Reader{ conn: conn, tableMap: make(map[uint64]binlog.TableDescription), @@ -45,7 +51,7 @@ func NewReader(conn *SlaveConn) (*Reader, error) { return r, nil } -// ReadEvent ... +// ReadEvent reads next event from the binary log. func (r *Reader) ReadEvent() (*Event, error) { connBuff, err := r.conn.ReadPacket() if err != nil { diff --git a/reader/slave_conn.go b/reader/slave/slave_conn.go similarity index 82% rename from reader/slave_conn.go rename to reader/slave/slave_conn.go index 0da9718..d49231c 100644 --- a/reader/slave_conn.go +++ b/reader/slave/slave_conn.go @@ -1,4 +1,4 @@ -package reader +package slave import ( "database/sql/driver" @@ -10,13 +10,14 @@ import ( "github.com/localhots/mysql" ) -// SlaveConn ... -type SlaveConn struct { +// Conn is a slave connection used to issue a binlog dump command. +type Conn struct { conn *mysql.ExtendedConn conf Config } -// Config ... +// Config contains slave connection configuration. It is passed to master upon +// registration. type Config struct { ServerID uint32 File string @@ -35,8 +36,8 @@ const ( resultERR byte = 0xFF ) -// Connect ... -func Connect(dsn string, conf Config) (*SlaveConn, error) { +// Connect esablishes a new slave connection. +func Connect(dsn string, conf Config) (*Conn, error) { if conf.Hostname == "" { name, err := os.Hostname() if err != nil { @@ -59,12 +60,12 @@ func Connect(dsn string, conf Config) (*SlaveConn, error) { return nil, err } - return &SlaveConn{conn: extconn, conf: conf}, nil + return &Conn{conn: extconn, conf: conf}, nil } // ReadPacket reads next packet from the server and processes the first status // byte. -func (c *SlaveConn) ReadPacket() ([]byte, error) { +func (c *Conn) ReadPacket() ([]byte, error) { data, err := c.conn.ReadPacket() if err != nil { return nil, err @@ -84,7 +85,7 @@ func (c *SlaveConn) ReadPacket() ([]byte, error) { // RegisterSlave issues a REGISTER_SLAVE command to master. // Spec: https://dev.mysql.com/doc/internals/en/com-register-slave.html -func (c *SlaveConn) RegisterSlave() error { +func (c *Conn) RegisterSlave() error { c.conn.ResetSequence() buf := tools.NewCommandBuffer(1 + 4 + 1 + len(c.conf.Hostname) + 1 + 1 + 2 + 4 + 4) @@ -106,7 +107,7 @@ func (c *SlaveConn) RegisterSlave() error { // StartBinlogDump issues a BINLOG_DUMP command to master. // Spec: https://dev.mysql.com/doc/internals/en/com-binlog-dump.html // TODO: https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html -func (c *SlaveConn) StartBinlogDump() error { +func (c *Conn) StartBinlogDump() error { c.conn.ResetSequence() buf := tools.NewCommandBuffer(1 + 4 + 2 + 4 + len(c.conf.File)) @@ -120,7 +121,7 @@ func (c *SlaveConn) StartBinlogDump() error { } // DisableChecksum disables CRC32 checksums for this connection. -func (c *SlaveConn) DisableChecksum() error { +func (c *Conn) DisableChecksum() error { cs, err := c.GetVar("BINLOG_CHECKSUM") if err != nil { return err @@ -133,7 +134,7 @@ func (c *SlaveConn) DisableChecksum() error { } // GetVar fetches value of the given variable. -func (c *SlaveConn) GetVar(name string) (string, error) { +func (c *Conn) GetVar(name string) (string, error) { rows, err := c.conn.Query(fmt.Sprintf("SHOW VARIABLES LIKE %q", name), []driver.Value{}) if err != nil { return "", notEOF(err) @@ -150,11 +151,11 @@ func (c *SlaveConn) GetVar(name string) (string, error) { } // SetVar assigns a new value to the given variable. -func (c *SlaveConn) SetVar(name, val string) error { +func (c *Conn) SetVar(name, val string) error { return c.conn.Exec(fmt.Sprintf("SET %s=%q", name, val)) } -func (c *SlaveConn) runCmd(data []byte) error { +func (c *Conn) runCmd(data []byte) error { err := c.conn.WritePacket(data) if err != nil { return err diff --git a/tests/integration_test.go b/tests/integration_test.go index 7893709..7cad2ab 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -11,6 +11,7 @@ import ( "github.com/localhots/bocadillo/binlog" "github.com/localhots/bocadillo/reader" + "github.com/localhots/bocadillo/reader/slave" ) var suite *testSuite @@ -30,12 +31,7 @@ func TestMain(m *testing.M) { conf.Offset = uint32(pos.Offset) } - slaveConn, err := reader.Connect(dsn, conf) - if err != nil { - log.Fatal(err) - } - - rdr, err := reader.NewReader(slaveConn) + rdr, err := reader.New(dsn, conf) if err != nil { log.Fatal(err) } @@ -50,7 +46,7 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } -func getConfig() (dsn string, conf reader.Config) { +func getConfig() (dsn string, conf slave.Config) { envOrDefault := func(name, def string) string { if val := os.Getenv(name); val != "" { return val