Move buffer package
This commit is contained in:
parent
ccce5ea0bb
commit
08c7552515
@ -5,7 +5,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/localhots/bocadillo/tools"
|
"github.com/localhots/bocadillo/buffer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FormatDescription is a description of binary log format.
|
// FormatDescription is a description of binary log format.
|
||||||
@ -52,7 +52,7 @@ const (
|
|||||||
// Decode decodes given buffer into a format description event.
|
// Decode decodes given buffer into a format description event.
|
||||||
// Spec: https://dev.mysql.com/doc/internals/en/format-description-event.html
|
// Spec: https://dev.mysql.com/doc/internals/en/format-description-event.html
|
||||||
func (e *FormatDescriptionEvent) Decode(data []byte) error {
|
func (e *FormatDescriptionEvent) Decode(data []byte) error {
|
||||||
buf := tools.NewBuffer(data)
|
buf := buffer.New(data)
|
||||||
e.Version = buf.ReadUint16()
|
e.Version = buf.ReadUint16()
|
||||||
e.ServerVersion = trimStringEOF(buf.ReadStringVarLen(50))
|
e.ServerVersion = trimStringEOF(buf.ReadStringVarLen(50))
|
||||||
e.CreateTimestamp = buf.ReadUint32()
|
e.CreateTimestamp = buf.ReadUint32()
|
||||||
|
@ -3,7 +3,7 @@ package binlog
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/localhots/bocadillo/tools"
|
"github.com/localhots/bocadillo/buffer"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -30,7 +30,7 @@ func (h *EventHeader) Decode(connBuff []byte, fd FormatDescription) error {
|
|||||||
return ErrInvalidHeader
|
return ErrInvalidHeader
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := tools.NewBuffer(connBuff)
|
buf := buffer.New(connBuff)
|
||||||
h.Timestamp = buf.ReadUint32()
|
h.Timestamp = buf.ReadUint32()
|
||||||
h.Type = EventType(buf.ReadUint8())
|
h.Type = EventType(buf.ReadUint8())
|
||||||
h.ServerID = buf.ReadUint32()
|
h.ServerID = buf.ReadUint32()
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package binlog
|
package binlog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/localhots/bocadillo/tools"
|
"github.com/localhots/bocadillo/buffer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueryEvent contains query details.
|
// QueryEvent contains query details.
|
||||||
@ -17,7 +17,7 @@ type QueryEvent struct {
|
|||||||
// Decode given buffer into a qeury event.
|
// Decode given buffer into a qeury event.
|
||||||
// Spec: https://dev.mysql.com/doc/internals/en/query-event.html
|
// Spec: https://dev.mysql.com/doc/internals/en/query-event.html
|
||||||
func (e *QueryEvent) Decode(connBuff []byte) {
|
func (e *QueryEvent) Decode(connBuff []byte) {
|
||||||
buf := tools.NewBuffer(connBuff)
|
buf := buffer.New(connBuff)
|
||||||
|
|
||||||
e.SlaveProxyID = buf.ReadUint32()
|
e.SlaveProxyID = buf.ReadUint32()
|
||||||
e.ExecutionTime = buf.ReadUint32()
|
e.ExecutionTime = buf.ReadUint32()
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package binlog
|
package binlog
|
||||||
|
|
||||||
import "github.com/localhots/bocadillo/tools"
|
import "github.com/localhots/bocadillo/buffer"
|
||||||
|
|
||||||
// Position is a pair of log file name and a binary offset in it that is used to
|
// Position is a pair of log file name and a binary offset in it that is used to
|
||||||
// represent the beginning of the event description.
|
// represent the beginning of the event description.
|
||||||
@ -18,7 +18,7 @@ type RotateEvent struct {
|
|||||||
// Decode decodes given buffer into a rotate event.
|
// Decode decodes given buffer into a rotate event.
|
||||||
// Spec: https://dev.mysql.com/doc/internals/en/rotate-event.html
|
// Spec: https://dev.mysql.com/doc/internals/en/rotate-event.html
|
||||||
func (e *RotateEvent) Decode(connBuff []byte, fd FormatDescription) error {
|
func (e *RotateEvent) Decode(connBuff []byte, fd FormatDescription) error {
|
||||||
buf := tools.NewBuffer(connBuff)
|
buf := buffer.New(connBuff)
|
||||||
if fd.Version > 1 {
|
if fd.Version > 1 {
|
||||||
e.NextFile.Offset = buf.ReadUint64()
|
e.NextFile.Offset = buf.ReadUint64()
|
||||||
} else {
|
} else {
|
||||||
|
@ -6,8 +6,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
|
||||||
|
"github.com/localhots/bocadillo/buffer"
|
||||||
"github.com/localhots/bocadillo/mysql"
|
"github.com/localhots/bocadillo/mysql"
|
||||||
"github.com/localhots/bocadillo/tools"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// RowsEvent contains a Rows Event.
|
// RowsEvent contains a Rows Event.
|
||||||
@ -58,7 +58,7 @@ func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescri
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
buf := tools.NewBuffer(connBuff)
|
buf := buffer.New(connBuff)
|
||||||
idSize := fd.TableIDSize(e.Type)
|
idSize := fd.TableIDSize(e.Type)
|
||||||
if idSize == 6 {
|
if idSize == 6 {
|
||||||
e.TableID = buf.ReadUint48()
|
e.TableID = buf.ReadUint48()
|
||||||
@ -103,7 +103,7 @@ func (e *RowsEvent) Decode(connBuff []byte, fd FormatDescription, td TableDescri
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte) ([]interface{}, error) {
|
func (e *RowsEvent) decodeRows(buf *buffer.Buffer, td TableDescription, bm []byte) ([]interface{}, error) {
|
||||||
count := 0
|
count := 0
|
||||||
for i := 0; i < int(e.ColumnCount); i++ {
|
for i := 0; i < int(e.ColumnCount); i++ {
|
||||||
if isBitSet(bm, i) {
|
if isBitSet(bm, i) {
|
||||||
@ -132,7 +132,7 @@ func (e *RowsEvent) decodeRows(buf *tools.Buffer, td TableDescription, bm []byte
|
|||||||
return row, nil
|
return row, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uint16) interface{} {
|
func (e *RowsEvent) decodeValue(buf *buffer.Buffer, ct mysql.ColumnType, meta uint16) interface{} {
|
||||||
var length int
|
var length int
|
||||||
if ct == mysql.ColumnTypeString {
|
if ct == mysql.ColumnTypeString {
|
||||||
if meta > 0xFF {
|
if meta > 0xFF {
|
||||||
@ -251,7 +251,7 @@ func (e *RowsEvent) decodeValue(buf *tools.Buffer, ct mysql.ColumnType, meta uin
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func readString(buf *tools.Buffer, length int) string {
|
func readString(buf *buffer.Buffer, length int) string {
|
||||||
// Length is encoded in 1 byte
|
// Length is encoded in 1 byte
|
||||||
if length < 256 {
|
if length < 256 {
|
||||||
return string(buf.ReadStringVarEnc(1))
|
return string(buf.ReadStringVarEnc(1))
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
package binlog
|
package binlog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/localhots/bocadillo/buffer"
|
||||||
"github.com/localhots/bocadillo/mysql"
|
"github.com/localhots/bocadillo/mysql"
|
||||||
"github.com/localhots/bocadillo/tools"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TableDescription contains table details required to process rows events.
|
// TableDescription contains table details required to process rows events.
|
||||||
@ -26,7 +26,7 @@ type TableMapEvent struct {
|
|||||||
// Decode decodes given buffer into a table map event.
|
// Decode decodes given buffer into a table map event.
|
||||||
// Spec: https://dev.mysql.com/doc/internals/en/table-map-event.html
|
// Spec: https://dev.mysql.com/doc/internals/en/table-map-event.html
|
||||||
func (e *TableMapEvent) Decode(connBuff []byte, fd FormatDescription) error {
|
func (e *TableMapEvent) Decode(connBuff []byte, fd FormatDescription) error {
|
||||||
buf := tools.NewBuffer(connBuff)
|
buf := buffer.New(connBuff)
|
||||||
idSize := fd.TableIDSize(EventTypeTableMap)
|
idSize := fd.TableIDSize(EventTypeTableMap)
|
||||||
if idSize == 6 {
|
if idSize == 6 {
|
||||||
e.TableID = buf.ReadUint48()
|
e.TableID = buf.ReadUint48()
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package tools
|
package buffer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
@ -13,9 +13,9 @@ type Buffer struct {
|
|||||||
pos int
|
pos int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBuffer creates a new buffer from a given slice of bytes and sets the
|
// New creates a new buffer from a given slice of bytes and sets the cursor to
|
||||||
// cursor to the beginning.
|
// the beginning.
|
||||||
func NewBuffer(data []byte) *Buffer {
|
func New(data []byte) *Buffer {
|
||||||
return &Buffer{data: data}
|
return &Buffer{data: data}
|
||||||
}
|
}
|
||||||
|
|
@ -6,8 +6,8 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"github.com/localhots/bocadillo/buffer"
|
||||||
"github.com/localhots/bocadillo/mysql/driver"
|
"github.com/localhots/bocadillo/mysql/driver"
|
||||||
"github.com/localhots/bocadillo/tools"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Conn is a slave connection used to issue a binlog dump command.
|
// Conn is a slave connection used to issue a binlog dump command.
|
||||||
@ -83,7 +83,7 @@ func (c *Conn) ReadPacket(ctx context.Context) ([]byte, error) {
|
|||||||
func (c *Conn) RegisterSlave() error {
|
func (c *Conn) RegisterSlave() error {
|
||||||
c.conn.ResetSequence()
|
c.conn.ResetSequence()
|
||||||
|
|
||||||
buf := tools.NewCommandBuffer(1 + 4 + 1 + len(c.conf.Hostname) + 1 + 1 + 2 + 4 + 4)
|
buf := buffer.NewCommandBuffer(1 + 4 + 1 + len(c.conf.Hostname) + 1 + 1 + 2 + 4 + 4)
|
||||||
buf.WriteByte(comRegisterSlave)
|
buf.WriteByte(comRegisterSlave)
|
||||||
buf.WriteUint32(c.conf.ServerID)
|
buf.WriteUint32(c.conf.ServerID)
|
||||||
buf.WriteStringLenEnc(c.conf.Hostname)
|
buf.WriteStringLenEnc(c.conf.Hostname)
|
||||||
@ -105,7 +105,7 @@ func (c *Conn) RegisterSlave() error {
|
|||||||
func (c *Conn) StartBinlogDump() error {
|
func (c *Conn) StartBinlogDump() error {
|
||||||
c.conn.ResetSequence()
|
c.conn.ResetSequence()
|
||||||
|
|
||||||
buf := tools.NewCommandBuffer(1 + 4 + 2 + 4 + len(c.conf.File))
|
buf := buffer.NewCommandBuffer(1 + 4 + 2 + 4 + len(c.conf.File))
|
||||||
buf.WriteByte(comBinlogDump)
|
buf.WriteByte(comBinlogDump)
|
||||||
buf.WriteUint32(uint32(c.conf.Offset))
|
buf.WriteUint32(uint32(c.conf.Offset))
|
||||||
buf.Skip(2) // Flags
|
buf.Skip(2) // Flags
|
||||||
|
Loading…
x
Reference in New Issue
Block a user