
My name is Artem, I work in the Rambler Group in the project "Flow" in the position of Go lead developer.
We spent a lot of time taming mysql binlog. In this article, the story of how quickly and with a minimum number of pitfalls to introduce the mechanism of working with binlog on Go.
Why do we need it?
Under the hood of the Flow there are high-loaded modules, where each query to the database keeps the user away from getting the result. Caching is a good decision, but when to flush the cache? Let the data tell us that they have been updated.
In mysql there is such a thing as master-slave replication. Our demon can pretend to be a slave and receive data via binlog. Binlog must be configured in row format. It contains all the database change commands, the commands under the transaction are executed only after commit. When the maximum allowed size is reached (1 gig by default), the next file is created. Each new file has a sequence number after the name.
Slightly more info
here or
here .
The article has two parts:
1. How to quickly start processing records that came in the log.
2. How to customize and expand what is under the hood.
Part 1. We start as soon as possible.
To work with binlog, we will use the
github.com/siddontang/go-mysql library.
Connect to a new channel (to work with channels,
ROW format is required
for binlog ʻa ).
func binLogListener() { c, err := getDefaultCanal() if err == nil { coords, err := c.GetMasterPos() if err == nil { c.SetEventHandler(&binlogHandler{}) c.RunFrom(coords) } } } func getDefaultCanal() (*canal.Canal, error) { cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", "127.0.0.1", 3306) cfg.User = "root" cfg.Password = "root" cfg.Flavor = "mysql" cfg.Dump.ExecutionPath = "" return canal.NewCanal(cfg) }
Create a wrapper over binlog:
type binlogHandler struct { canal.DummyEventHandler // BinlogParser // } func (h *binlogHandler) OnRow(e *canal.RowsEvent) error {return nil} func (h *binlogHandler) String() string {return "binlogHandler"}
BinlogParserExpand the logic of working with the received binlog string by adding logic to the OnRow () method.
func (h *binlogHandler) OnRow(e *canal.RowsEvent) error { var n int // var k int // switch e.Action { case canal.DeleteAction: return nil // case canal.UpdateAction: n = 1 k = 2 case canal.InsertAction: n = 0 k = 1 } for i := n; i < len(e.Rows); i += k { key := e.Table.Schema + "." + e.Table.Name switch key { case User{}.SchemaName() + "." + User{}.TableName(): } } return nil }
The essence of this wrapper is to parse the incoming data. The data comes to us two records on the update line (the first line will contain the original data, the second - updated). Immediately we will also consider the possibility of multincerts and multi-updates. In this case, we will need to take every second entry for UPDATE. For this, in the example above, we entered n and k.
Let's make a model to get data from binlog. In it we will read data from the received lines. In the annotations we indicate the names of the columns:
type User struct { Id int `gorm:"column:id"` Name string `gorm:"column:name"` Status string `gorm:"column:status"` Created time.Time `gorm:"column:created"` } func (User) TableName() string { return "User" } func (User) SchemaName() string { return "Test" }
Table structure in MYSQL:
CREATE TABLE Test.User ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(40) NULL , status ENUM("active","deleted") DEFAULT "active", created TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP ) ENGINE =InnoDB;
Let us analyze parsing itself - to the place of procurement for parsing data, add:
user := User{} h.GetBinLogData(&user, e, i)
In fact, this is enough - we will have the data of the new record in the user model, but for clarity, we will display them:
if e.Action == canal.UpdateAction { oldUser := User{} h.GetBinLogData(&oldUser, e, i-1) fmt.Printf("User %d is updated from name %s to name %s\n", user.Id, oldUser.Name, user.Name, ) } else { fmt.Printf("User %d is created with name %s\n", user.Id, user.Name, ) }
The main point we were striving for was launching our “Hello binlog world”:
func main() { go binLogListener()
Next, add and update the values:
INSERT INTO Test.User (`id`,`name`) VALUE (1,"Jack"); UPDATE Test.User SET name="Jonh" WHERE id=1;
We'll see:
User 1 is created with name Jack User 1 name changed from Jack to Jonh
The resulting code works with binlog and parses new lines. When getting a record from the table we need, the code reads the data into the structure and outputs the result. Behind the scenes was the data parser (BinlogParser), which filled the model.
Part 2. As Cobb said, we need a level below.
Consider the inner workings of the parser, which is based on reflection.
To populate the model with data, we used the handler method:
h.GetBinLogData(&user, e, i)
It parses simple data types:
bool int float64 string time.Time
and can parse complex structures from json.
If supported types are not enough for you or you just want to understand how binlog parsing works, then you can practice adding your own types.
First, we consider how to fill in the data for the model field using the example of an Id field of type int:
type User struct { Id int `gorm:"column:id"` }
Through reflection we get the type name. The parseTagSetting method converts annotations into a more convenient structure:
element := User{} // , v := reflect.ValueOf(element) s := reflect.Indirect(v) t := s.Type() num := t.NumField() parsedTag := parseTagSetting(t.Field(k).Tag) if columnName, ok = parsedTag["COLUMN"]; !ok || columnName == "COLUMN" { continue } for k := 0; k < num; k++ { name := s.Field(k).Type().Name() switch name { case "int": // } }
Having received the int type, you can set its value through the reflection method:
func (v Value) SetInt(x int64) {
Method for parsing annotations:
func parseTagSetting(tags reflect.StructTag) map[string]string { setting := map[string]string{} for _, str := range []string{tags.Get("sql"), tags.Get("gorm")} { tags := strings.Split(str, ";") for _, value := range tags { v := strings.Split(value, ":") k := strings.TrimSpace(strings.ToUpper(v[0])) if len(v) >= 2 { setting[k] = strings.Join(v[1:], ":") } else { setting[k] = k } } } return setting }
It takes int64 as input. Let's make a method that will transfer the data from binlog to int64:
func (m *BinlogParser) intHelper(e *canal.RowsEvent, n int, columnName string) int64 { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type != schema.TYPE_NUMBER { return 0 } switch e.Rows[n][columnId].(type) { case int8: return int64(e.Rows[n][columnId].(int8)) case int32: return int64(e.Rows[n][columnId].(int32)) case int64: return e.Rows[n][columnId].(int64) case int: return int64(e.Rows[n][columnId].(int)) case uint8: return int64(e.Rows[n][columnId].(uint8)) case uint16: return int64(e.Rows[n][columnId].(uint16)) case uint32: return int64(e.Rows[n][columnId].(uint32)) case uint64: return int64(e.Rows[n][columnId].(uint64)) case uint: return int64(e.Rows[n][columnId].(uint)) } return 0 }
Everything looks logical, except for the getBinlogIdByName () method.
This trivial helper is needed to work with column names instead of their sequence number, which allows:
- take column names from gorm annotations;
- no need to make edits when adding columns to the beginning or middle;
- It is more convenient to work with the name field than with column number 3.
As a result, add the handler itself:
s.Field(k).SetInt(m.intHelper(e, n, columnName))
Consider two more examples.ENUM: here the values come in as index - that is, the status “active” will come as 1. In most cases, we need the string representation of enum. It can be obtained from the description of the field. When parsing enum values, it comes from 1, but the array of possible values starts from 0.
The Enum handler might look like this:
func (m *BinlogParser) stringHelper(e *canal.RowsEvent, n int, columnName string) string { columnId := m.getBinlogIdByName(e, columnName) if e.Table.Columns[columnId].Type == schema.TYPE_ENUM { values := e.Table.Columns[columnId].EnumValues // if len(values) == 0 || e.Rows[n][columnId] == nil {{ return "" } return values[e.Rows[n][columnId].(int64)-1] // 0 }
I want to store JSONGood idea why not. JSON in terms of mysql is a string. We need to somehow indicate that this data is serialized - for this we will add a non-canonical “fromJson” annotation to the gorm.
Imagine that such a structure should be considered:
type JsonData struct { Int int `gorm:"column:int"` StructData TestData `gorm:"column:struct_data;fromJson"` MapData map[string]string `gorm:"column:map_data;fromJson"` SliceData []int `gorm:"column:slice_data;fromJson"` } type TestData struct { Test string `json:"test"` Int int `json:"int"` }
You can write a lot of conditions and probably succeed. But each new data type will kill all efforts. Although an attempt to find answers to stackoverflow - “how to lead and deserialize an unknown type of structure” begins with the phrase: “It is not clear why you need it, but try ...”.
Having brought the necessary type to the interface, we can make it:
if _, ok := parsedTag["FROMJSON"]; ok { newObject := reflect.New(s.Field(k).Type()).Interface() json := m.stringHelper(e, n, columnName) jsoniter.Unmarshal([]byte(json), &newObject) s.Field(k).Set(reflect.ValueOf(newObject).Elem().Convert(s.Field(k).Type())) }
If you have questions about data types, you can see the
tests or ask them in the comments.
What happened in the end .