From 4caf5728112d169317542b3f531c61c8dd04db61 Mon Sep 17 00:00:00 2001 From: Gregory Eremin Date: Sun, 11 Nov 2018 16:02:11 +0100 Subject: [PATCH] Clear table map if end-of-statement flag is set --- binlog/event_rows.go | 24 +++++++++++++----------- reader/reader.go | 27 ++++++++++++++++----------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/binlog/event_rows.go b/binlog/event_rows.go index c5c8cf7..bc6f814 100644 --- a/binlog/event_rows.go +++ b/binlog/event_rows.go @@ -15,7 +15,7 @@ import ( type RowsEvent struct { Type EventType TableID uint64 - Flags uint16 + Flags RowsFlag ExtraData []byte ColumnCount uint64 ColumnBitmap1 []byte @@ -23,23 +23,25 @@ type RowsEvent struct { Rows [][]interface{} } -type rowsFlag uint16 +// RowsFlag is bitmask of flags. +type RowsFlag uint16 const ( - rowsFlagEndOfStatement rowsFlag = 0x0001 - rowsFlagNoForeignKeyChecks rowsFlag = 0x0002 - rowsFlagNoUniqueKeyChecks rowsFlag = 0x0004 - rowsFlagRowHasColumns rowsFlag = 0x0008 + // RowsFlagEndOfStatement is used to clear old table mappings. + RowsFlagEndOfStatement RowsFlag = 0x0001 + rowsFlagNoForeignKeyChecks RowsFlag = 0x0002 + rowsFlagNoUniqueKeyChecks RowsFlag = 0x0004 + rowsFlagRowHasColumns RowsFlag = 0x0008 freeTableMapID = 0x00FFFFFF ) -// PeekTableID returns table ID without decoding whole event. -func (e *RowsEvent) PeekTableID(connBuff []byte, fd FormatDescription) uint64 { +// PeekTableIDAndFlags returns table ID and flags without decoding whole event. +func (e *RowsEvent) PeekTableIDAndFlags(connBuff []byte, fd FormatDescription) (uint64, RowsFlag) { if fd.TableIDSize(e.Type) == 6 { - return mysql.DecodeUint48(connBuff) + return mysql.DecodeUint48(connBuff), RowsFlag(mysql.DecodeUint16(connBuff[6:])) } - return uint64(mysql.DecodeUint32(connBuff)) + return uint64(mysql.DecodeUint32(connBuff)), RowsFlag(mysql.DecodeUint16(connBuff[4:])) } // Decode decodes given buffer into a rows event event. @@ -71,7 +73,7 @@ func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescri e.TableID = uint64(buf.ReadUint32()) } - e.Flags = buf.ReadUint16() + e.Flags = RowsFlag(buf.ReadUint16()) if RowsEventHasExtraData(e.Type) { // Extra data length is part of extra data, deduct 2 bytes as they diff --git a/reader/reader.go b/reader/reader.go index 9a52694..74009b0 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -33,10 +33,8 @@ func New(dsn string, sc slave.Config) (*Reader, error) { return nil, errors.Annotate(err, "establish slave connection") } - r := &Reader{ - conn: conn, - tableMap: make(map[uint64]binlog.TableDescription), - } + r := &Reader{conn: conn} + r.initTableMap() if err := conn.DisableChecksum(); err != nil { return nil, errors.Annotate(err, "disable binlog checksum") @@ -76,8 +74,7 @@ func (r *Reader) ReadEvent() (*Event, error) { switch evt.Header.Type { case binlog.EventTypeFormatDescription: var fde binlog.FormatDescriptionEvent - err := fde.Decode(evt.Buffer) - if err != nil { + if err := fde.Decode(evt.Buffer); err != nil { return nil, errors.Annotate(err, "decode format description event") } r.format = fde.FormatDescription @@ -85,16 +82,14 @@ func (r *Reader) ReadEvent() (*Event, error) { case binlog.EventTypeRotate: var re binlog.RotateEvent - err := re.Decode(evt.Buffer, r.format) - if err != nil { + if err := re.Decode(evt.Buffer, r.format); err != nil { return nil, errors.Annotate(err, "decode rotate event") } r.state = re.NextFile case binlog.EventTypeTableMap: var tme binlog.TableMapEvent - err := tme.Decode(evt.Buffer, r.format) - if err != nil { + if err := tme.Decode(evt.Buffer, r.format); err != nil { return nil, errors.Annotate(err, "decode table map event") } r.tableMap[tme.TableID] = tme.TableDescription @@ -110,13 +105,19 @@ func (r *Reader) ReadEvent() (*Event, error) { binlog.EventTypeDeleteRowsV2: re := binlog.RowsEvent{Type: evt.Header.Type} - tableID := re.PeekTableID(evt.Buffer, r.format) + tableID, flags := re.PeekTableIDAndFlags(evt.Buffer, r.format) td, ok := r.tableMap[tableID] if !ok { return nil, errors.New("Unknown table ID") } evt.Table = &td + // Throttle table map clearing. This flag could be part of every single + // rows event + if binlog.RowsFlagEndOfStatement&flags > 0 && len(r.tableMap) > 100 { + // Clear table map + r.initTableMap() + } case binlog.EventTypeQuery: // Can be decoded by the receiver case binlog.EventTypeXID: @@ -128,6 +129,10 @@ func (r *Reader) ReadEvent() (*Event, error) { return &evt, err } +func (r *Reader) initTableMap() { + r.tableMap = make(map[uint64]binlog.TableDescription) +} + // DecodeRows decodes buffer into a rows event. func (e Event) DecodeRows() (binlog.RowsEvent, error) { re := binlog.RowsEvent{Type: e.Header.Type}