diff --git a/schema/manager.go b/schema/manager.go new file mode 100644 index 0000000..09ef8b2 --- /dev/null +++ b/schema/manager.go @@ -0,0 +1,73 @@ +package schema + +import ( + "database/sql" + "strings" +) + +// SchemaManager maintains table schemas. +type SchemaManager struct { + Schema *Schema + db *sql.DB +} + +// NewManager creates a new schema manager. +func NewManager(db *sql.DB) *SchemaManager { + return &SchemaManager{ + Schema: NewSchema(), + db: db, + } +} + +// Manage adds given tables to a list of managed tables and updates its details. +func (m *SchemaManager) Manage(database, table string) error { + cols, err := m.tableColumns(database, table) + if err != nil { + return err + } + + m.Schema.Update(database, table, cols) + return nil +} + +// ProcessQuery accepts an SQL query and updates schema if required. +func (m *SchemaManager) ProcessQuery(query string) error { + if strings.HasPrefix(query, "ALTER TABLE") { + for database, tables := range m.Schema.tables { + for table := range tables { + if err := m.Manage(database, table); err != nil { + return err + } + } + } + } + return nil +} + +func (m *SchemaManager) tableColumns(database, table string) ([]Column, error) { + rows, err := m.db.Query(` + SELECT COLUMN_NAME, COLUMN_TYPE + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? + ORDER BY ORDINAL_POSITION ASC + `, database, table) + if err != nil { + return nil, err + } + defer rows.Close() + + cols := make([]Column, 0) + for rows.Next() { + var col Column + var typ string + err := rows.Scan(&col.Name, &typ) + if err != nil { + return nil, err + } + if strings.Contains(strings.ToLower(typ), "unsigned") { + col.Unsigned = true + } + cols = append(cols, col) + } + return cols, nil +} diff --git a/schema/schema.go b/schema/schema.go new file mode 100644 index 0000000..2986d63 --- /dev/null +++ b/schema/schema.go @@ -0,0 +1,53 @@ +package schema + +// Schema contains table definitions. +type Schema struct { + tables map[string]map[string]Table +} + +// Table is a list of columns. +type Table struct { + columns []Column +} + +// Column carries two key column parameters that are not available in the binary +// log of older versions of MySQL. +type Column struct { + Name string + // Unsigned is true if the column is of integer or decimal types and is + // unsigned. + Unsigned bool +} + +// NewSchema creates a new managed schema object. +func NewSchema() *Schema { + return &Schema{tables: make(map[string]map[string]Table)} +} + +// Table returns table details for a given database and table name pair. If the +// table can't be found nil is returned. +func (s Schema) Table(database, table string) *Table { + if d, ok := s.tables[database]; ok { + if t, ok := d[table]; ok { + return &t + } + } + return nil +} + +// Update sets new column definitions for a given database and table name pair. +func (s Schema) Update(database, table string, cols []Column) { + if _, ok := s.tables[database]; !ok { + s.tables[database] = make(map[string]Table) + } + s.tables[database][table] = Table{columns: cols} +} + +// Column returns column details for the given column index. If index is out of +// range nil is returned. +func (t Table) Column(i int) *Column { + if i >= 0 && i < len(t.columns) { + return &t.columns[i] + } + return nil +}