Clear table map if end-of-statement flag is set
This commit is contained in:
		
							parent
							
								
									6f61469eea
								
							
						
					
					
						commit
						4caf572811
					
				@ -15,7 +15,7 @@ import (
 | 
			
		||||
type RowsEvent struct {
 | 
			
		||||
	Type          EventType
 | 
			
		||||
	TableID       uint64
 | 
			
		||||
	Flags         uint16
 | 
			
		||||
	Flags         RowsFlag
 | 
			
		||||
	ExtraData     []byte
 | 
			
		||||
	ColumnCount   uint64
 | 
			
		||||
	ColumnBitmap1 []byte
 | 
			
		||||
@ -23,23 +23,25 @@ type RowsEvent struct {
 | 
			
		||||
	Rows          [][]interface{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type rowsFlag uint16
 | 
			
		||||
// RowsFlag is bitmask of flags.
 | 
			
		||||
type RowsFlag uint16
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	rowsFlagEndOfStatement     rowsFlag = 0x0001
 | 
			
		||||
	rowsFlagNoForeignKeyChecks rowsFlag = 0x0002
 | 
			
		||||
	rowsFlagNoUniqueKeyChecks  rowsFlag = 0x0004
 | 
			
		||||
	rowsFlagRowHasColumns      rowsFlag = 0x0008
 | 
			
		||||
	// RowsFlagEndOfStatement is used to clear old table mappings.
 | 
			
		||||
	RowsFlagEndOfStatement     RowsFlag = 0x0001
 | 
			
		||||
	rowsFlagNoForeignKeyChecks RowsFlag = 0x0002
 | 
			
		||||
	rowsFlagNoUniqueKeyChecks  RowsFlag = 0x0004
 | 
			
		||||
	rowsFlagRowHasColumns      RowsFlag = 0x0008
 | 
			
		||||
 | 
			
		||||
	freeTableMapID = 0x00FFFFFF
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// PeekTableID returns table ID without decoding whole event.
 | 
			
		||||
func (e *RowsEvent) PeekTableID(connBuff []byte, fd FormatDescription) uint64 {
 | 
			
		||||
// PeekTableIDAndFlags returns table ID and flags without decoding whole event.
 | 
			
		||||
func (e *RowsEvent) PeekTableIDAndFlags(connBuff []byte, fd FormatDescription) (uint64, RowsFlag) {
 | 
			
		||||
	if fd.TableIDSize(e.Type) == 6 {
 | 
			
		||||
		return mysql.DecodeUint48(connBuff)
 | 
			
		||||
		return mysql.DecodeUint48(connBuff), RowsFlag(mysql.DecodeUint16(connBuff[6:]))
 | 
			
		||||
	}
 | 
			
		||||
	return uint64(mysql.DecodeUint32(connBuff))
 | 
			
		||||
	return uint64(mysql.DecodeUint32(connBuff)), RowsFlag(mysql.DecodeUint16(connBuff[4:]))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Decode decodes given buffer into a rows event event.
 | 
			
		||||
@ -71,7 +73,7 @@ func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescri
 | 
			
		||||
		e.TableID = uint64(buf.ReadUint32())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	e.Flags = buf.ReadUint16()
 | 
			
		||||
	e.Flags = RowsFlag(buf.ReadUint16())
 | 
			
		||||
 | 
			
		||||
	if RowsEventHasExtraData(e.Type) {
 | 
			
		||||
		// Extra data length is part of extra data, deduct 2 bytes as they
 | 
			
		||||
 | 
			
		||||
@ -33,10 +33,8 @@ func New(dsn string, sc slave.Config) (*Reader, error) {
 | 
			
		||||
		return nil, errors.Annotate(err, "establish slave connection")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	r := &Reader{
 | 
			
		||||
		conn:     conn,
 | 
			
		||||
		tableMap: make(map[uint64]binlog.TableDescription),
 | 
			
		||||
	}
 | 
			
		||||
	r := &Reader{conn: conn}
 | 
			
		||||
	r.initTableMap()
 | 
			
		||||
 | 
			
		||||
	if err := conn.DisableChecksum(); err != nil {
 | 
			
		||||
		return nil, errors.Annotate(err, "disable binlog checksum")
 | 
			
		||||
@ -76,8 +74,7 @@ func (r *Reader) ReadEvent() (*Event, error) {
 | 
			
		||||
	switch evt.Header.Type {
 | 
			
		||||
	case binlog.EventTypeFormatDescription:
 | 
			
		||||
		var fde binlog.FormatDescriptionEvent
 | 
			
		||||
		err := fde.Decode(evt.Buffer)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
		if err := fde.Decode(evt.Buffer); err != nil {
 | 
			
		||||
			return nil, errors.Annotate(err, "decode format description event")
 | 
			
		||||
		}
 | 
			
		||||
		r.format = fde.FormatDescription
 | 
			
		||||
@ -85,16 +82,14 @@ func (r *Reader) ReadEvent() (*Event, error) {
 | 
			
		||||
 | 
			
		||||
	case binlog.EventTypeRotate:
 | 
			
		||||
		var re binlog.RotateEvent
 | 
			
		||||
		err := re.Decode(evt.Buffer, r.format)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
		if err := re.Decode(evt.Buffer, r.format); err != nil {
 | 
			
		||||
			return nil, errors.Annotate(err, "decode rotate event")
 | 
			
		||||
		}
 | 
			
		||||
		r.state = re.NextFile
 | 
			
		||||
 | 
			
		||||
	case binlog.EventTypeTableMap:
 | 
			
		||||
		var tme binlog.TableMapEvent
 | 
			
		||||
		err := tme.Decode(evt.Buffer, r.format)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
		if err := tme.Decode(evt.Buffer, r.format); err != nil {
 | 
			
		||||
			return nil, errors.Annotate(err, "decode table map event")
 | 
			
		||||
		}
 | 
			
		||||
		r.tableMap[tme.TableID] = tme.TableDescription
 | 
			
		||||
@ -110,13 +105,19 @@ func (r *Reader) ReadEvent() (*Event, error) {
 | 
			
		||||
		binlog.EventTypeDeleteRowsV2:
 | 
			
		||||
 | 
			
		||||
		re := binlog.RowsEvent{Type: evt.Header.Type}
 | 
			
		||||
		tableID := re.PeekTableID(evt.Buffer, r.format)
 | 
			
		||||
		tableID, flags := re.PeekTableIDAndFlags(evt.Buffer, r.format)
 | 
			
		||||
		td, ok := r.tableMap[tableID]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return nil, errors.New("Unknown table ID")
 | 
			
		||||
		}
 | 
			
		||||
		evt.Table = &td
 | 
			
		||||
 | 
			
		||||
		// Throttle table map clearing. This flag could be part of every single
 | 
			
		||||
		// rows event
 | 
			
		||||
		if binlog.RowsFlagEndOfStatement&flags > 0 && len(r.tableMap) > 100 {
 | 
			
		||||
			// Clear table map
 | 
			
		||||
			r.initTableMap()
 | 
			
		||||
		}
 | 
			
		||||
	case binlog.EventTypeQuery:
 | 
			
		||||
		// Can be decoded by the receiver
 | 
			
		||||
	case binlog.EventTypeXID:
 | 
			
		||||
@ -128,6 +129,10 @@ func (r *Reader) ReadEvent() (*Event, error) {
 | 
			
		||||
	return &evt, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Reader) initTableMap() {
 | 
			
		||||
	r.tableMap = make(map[uint64]binlog.TableDescription)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DecodeRows decodes buffer into a rows event.
 | 
			
		||||
func (e Event) DecodeRows() (binlog.RowsEvent, error) {
 | 
			
		||||
	re := binlog.RowsEvent{Type: e.Header.Type}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user