1
0
Fork 0
bocadillo/reader/reader.go

168 lines
4.4 KiB
Go
Raw Permalink Normal View History

package reader
import (
"context"
"github.com/juju/errors"
2018-11-06 22:17:48 +00:00
"github.com/localhots/bocadillo/binlog"
"github.com/localhots/bocadillo/mysql/driver"
)
2018-11-11 14:26:40 +00:00
// Reader is a binary log reader.
type Reader struct {
conn *driver.Conn
state binlog.Position
format binlog.FormatDescription
tableMap map[uint64]binlog.TableDescription
}
2018-11-11 14:26:40 +00:00
// Event contains binlog event details.
type Event struct {
2018-11-11 13:24:45 +00:00
Format binlog.FormatDescription
Header binlog.EventHeader
2018-11-11 13:24:45 +00:00
Buffer []byte
2018-11-14 14:52:44 +00:00
Offset uint64
2018-11-11 13:24:45 +00:00
// Table is not empty for rows events
Table *binlog.TableDescription
}
2018-11-14 14:52:44 +00:00
var (
// ErrUnknownTableID is returned when a table ID from a rows event is
// missing in the table map index.
ErrUnknownTableID = errors.New("Unknown table ID")
)
2018-11-11 14:26:40 +00:00
// New creates a new binary log reader.
func New(dsn string, sc driver.Config) (*Reader, error) {
conn, err := driver.Connect(dsn, sc)
2018-11-11 14:26:40 +00:00
if err != nil {
2020-07-27 22:12:19 +00:00
return nil, errors.Annotate(err, "establish connection")
2018-11-11 14:26:40 +00:00
}
2018-11-11 19:32:59 +00:00
r := &Reader{
conn: conn,
2018-11-11 19:33:29 +00:00
state: binlog.Position{
File: sc.File,
Offset: uint64(sc.Offset),
},
2018-11-11 19:32:59 +00:00
}
r.initTableMap()
if err := conn.DisableChecksum(); err != nil {
return nil, errors.Annotate(err, "disable binlog checksum")
}
if err := conn.RegisterSlave(); err != nil {
2020-07-27 22:12:19 +00:00
return nil, errors.Annotate(err, "register replica server")
}
if err := conn.StartBinlogDump(); err != nil {
return nil, errors.Annotate(err, "start binlog dump")
}
return r, nil
}
2018-11-11 14:26:40 +00:00
// ReadEvent reads next event from the binary log.
func (r *Reader) ReadEvent(ctx context.Context) (*Event, error) {
connBuff, err := r.conn.ReadPacket(ctx)
if err != nil {
2018-11-11 13:24:45 +00:00
return nil, errors.Annotate(err, "read next event")
}
2018-11-14 14:56:08 +00:00
evt := Event{Format: r.format, Offset: r.state.Offset}
if err := evt.Header.Decode(connBuff, r.format); err != nil {
return nil, errors.Annotate(err, "decode event header")
}
if evt.Header.NextOffset > 0 {
r.state.Offset = uint64(evt.Header.NextOffset)
}
2018-11-11 13:24:45 +00:00
evt.Buffer = connBuff[r.format.HeaderLen():]
csa := r.format.ServerDetails.ChecksumAlgorithm
if evt.Header.Type != binlog.EventTypeFormatDescription && csa == binlog.ChecksumAlgorithmCRC32 {
2018-11-11 13:24:45 +00:00
// Remove trailing CRC32 checksum, we're not going to verify it
evt.Buffer = evt.Buffer[:len(evt.Buffer)-4]
}
switch evt.Header.Type {
case binlog.EventTypeFormatDescription:
var fde binlog.FormatDescriptionEvent
if err := fde.Decode(evt.Buffer); err != nil {
2018-11-11 13:24:45 +00:00
return nil, errors.Annotate(err, "decode format description event")
}
2018-11-11 13:24:45 +00:00
r.format = fde.FormatDescription
evt.Format = fde.FormatDescription
case binlog.EventTypeRotate:
var re binlog.RotateEvent
if err := re.Decode(evt.Buffer, r.format); err != nil {
2018-11-11 13:24:45 +00:00
return nil, errors.Annotate(err, "decode rotate event")
}
2018-11-11 13:24:45 +00:00
r.state = re.NextFile
case binlog.EventTypeTableMap:
var tme binlog.TableMapEvent
if err := tme.Decode(evt.Buffer, r.format); err != nil {
2018-11-11 13:24:45 +00:00
return nil, errors.Annotate(err, "decode table map event")
}
2018-11-11 13:24:45 +00:00
r.tableMap[tme.TableID] = tme.TableDescription
case binlog.EventTypeWriteRowsV0,
binlog.EventTypeWriteRowsV1,
binlog.EventTypeWriteRowsV2,
binlog.EventTypeUpdateRowsV0,
binlog.EventTypeUpdateRowsV1,
binlog.EventTypeUpdateRowsV2,
binlog.EventTypeDeleteRowsV0,
binlog.EventTypeDeleteRowsV1,
binlog.EventTypeDeleteRowsV2:
re := binlog.RowsEvent{Type: evt.Header.Type}
tableID, flags := re.PeekTableIDAndFlags(evt.Buffer, r.format)
td, ok := r.tableMap[tableID]
if !ok {
2018-11-14 14:52:44 +00:00
return nil, ErrUnknownTableID
}
2018-11-08 19:47:49 +00:00
evt.Table = &td
// Throttle table map clearing. This flag could be part of every single
// rows event
if binlog.RowsFlagEndOfStatement&flags > 0 && len(r.tableMap) > 100 {
// Clear table map
r.initTableMap()
}
2018-11-11 13:24:45 +00:00
case binlog.EventTypeQuery:
// Can be decoded by the receiver
case binlog.EventTypeXID:
2018-11-11 13:24:45 +00:00
// Can be decoded by the receiver
case binlog.EventTypeGTID:
// TODO: Add support
}
return &evt, err
}
2018-11-11 13:33:26 +00:00
2018-11-11 19:33:29 +00:00
// State returns current position in the binary log.
func (r *Reader) State() binlog.Position {
return r.state
}
2018-11-11 19:32:59 +00:00
// Close underlying database connection.
func (r *Reader) Close() error {
return r.conn.Close()
}
func (r *Reader) initTableMap() {
r.tableMap = make(map[uint64]binlog.TableDescription)
}
2018-11-11 13:33:26 +00:00
// DecodeRows decodes buffer into a rows event.
func (e Event) DecodeRows() (binlog.RowsEvent, error) {
re := binlog.RowsEvent{Type: e.Header.Type}
if binlog.RowsEventVersion(e.Header.Type) < 0 {
return re, errors.New("invalid rows event")
}
2018-11-11 19:35:31 +00:00
err := re.Decode(e.Buffer, e.Format, *e.Table)
return re, err
2018-11-11 13:33:26 +00:00
}