Skip to content

Commit e3210a9

Browse files
author
Shlomi Noach
committed
Merge pull request #4 from github/parsing-mysqlbinlog
further work on mysqlbinlog_reader
2 parents e088a3a + 30a472f commit e3210a9

File tree

10 files changed

+503
-74
lines changed

10 files changed

+503
-74
lines changed

build.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#!/bin/bash
2+
#
3+
#
24

35
buildpath=/tmp/gh-osc
46
target=gh-osc

go/binlog/binlog.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2015 Shlomi Noach, courtesy Booking.com
2+
Copyright 2015 Shlomi Noach
33
*/
44

55
package binlog
@@ -11,8 +11,10 @@ import (
1111
"strings"
1212
)
1313

14+
// BinlogType identifies the type of the log: relay or binary log
1415
type BinlogType int
1516

17+
// BinaryLog, RelayLog are binlog types
1618
const (
1719
BinaryLog BinlogType = iota
1820
RelayLog

go/binlog/binlog_entry.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright 2016 GitHub Inc.
3+
See https://github.com/github/gh-osc/blob/master/LICENSE
4+
*/
5+
6+
package binlog
7+
8+
// BinlogEntry describes an entry in the binary log
9+
type BinlogEntry struct {
10+
LogPos uint64
11+
EndLogPos uint64
12+
StatementType string // INSERT, UPDATE, DELETE
13+
DatabaseName string
14+
TableName string
15+
PositionalColumns map[uint64]interface{}
16+
}
17+
18+
// NewBinlogEntry creates an empty, ready to go BinlogEntry object
19+
func NewBinlogEntry() *BinlogEntry {
20+
binlogEntry := &BinlogEntry{}
21+
binlogEntry.PositionalColumns = make(map[uint64]interface{})
22+
return binlogEntry
23+
}
24+
25+
// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned
26+
func (this *BinlogEntry) Duplicate() *BinlogEntry {
27+
binlogEntry := NewBinlogEntry()
28+
binlogEntry.LogPos = this.LogPos
29+
binlogEntry.EndLogPos = this.EndLogPos
30+
return binlogEntry
31+
}

go/binlog/binlog_reader.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
/*
2-
Copyright 2016 GitHub Inc.
2+
Copyright 2016 GitHub Inc.
3+
See https://github.com/github/gh-osc/blob/master/LICENSE
34
*/
45

56
package binlog
67

7-
type BinlogEntry struct {
8-
LogPos uint64
9-
EndLogPos uint64
10-
StatementType string // INSERT, UPDATE, DELETE
11-
DatabaseName string
12-
TableName string
13-
}
14-
8+
// BinlogReader is a general interface whose implementations can choose their methods of reading
9+
// a binary log file and parsing it into binlog entries
1510
type BinlogReader interface {
1611
ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error)
1712
}

go/binlog/binlog_test.go

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,15 @@
11
/*
2-
Copyright 2014 Outbrain Inc.
3-
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
2+
Copyright 2016 GitHub Inc.
3+
See https://github.com/github/gh-osc/blob/master/LICENSE
154
*/
165

176
package binlog
187

198
import (
9+
"testing"
10+
2011
"github.com/outbrain/golib/log"
2112
test "github.com/outbrain/golib/tests"
22-
"testing"
2313
)
2414

2515
func init() {

go/binlog/mysqlbinlog_reader.go

Lines changed: 137 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/*
2-
Copyright 2016 GitHub Inc.
2+
Copyright 2016 GitHub Inc.
3+
See https://github.com/github/gh-osc/blob/master/LICENSE
34
*/
45

56
package binlog
@@ -23,6 +24,20 @@ var (
2324
startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$")
2425
endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)")
2526
statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`")
27+
tokenRegxp = regexp.MustCompile("### (WHERE|SET)$")
28+
positionalColumnRegexp = regexp.MustCompile("### @([0-9]+)=(.+)$")
29+
)
30+
31+
// BinlogEntryState is a state in the binlog parser automaton / state machine
32+
type BinlogEntryState string
33+
34+
// States of the state machine
35+
const (
36+
InvalidState BinlogEntryState = "InvalidState"
37+
SearchForStartPosOrStatementState = "SearchForStartPosOrStatementState"
38+
ExpectEndLogPosState = "ExpectEndLogPosState"
39+
ExpectTokenState = "ExpectTokenState"
40+
PositionalColumnAssignmentState = "PositionalColumnAssignmentState"
2641
)
2742

2843
// MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog`
@@ -33,6 +48,7 @@ type MySQLBinlogReader struct {
3348
MySQLBinlogBinary string
3449
}
3550

51+
// NewMySQLBinlogReader creates a new reader that directly parses binlog files from the filesystem
3652
func NewMySQLBinlogReader(basedir string, datadir string) (mySQLBinlogReader *MySQLBinlogReader) {
3753
mySQLBinlogReader = &MySQLBinlogReader{
3854
Basedir: basedir,
@@ -61,7 +77,8 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop
6177
if err != nil {
6278
return entries, log.Errore(err)
6379
}
64-
chunkEntries, err := parseEntries(entriesBytes)
80+
81+
chunkEntries, err := parseEntries(bufio.NewScanner(bytes.NewReader(entriesBytes)))
6582
if err != nil {
6683
return entries, log.Errore(err)
6784
}
@@ -77,64 +94,136 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop
7794
return entries, err
7895
}
7996

80-
func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) {
81-
scanner := bufio.NewScanner(bytes.NewReader(entriesBytes))
82-
expectEndLogPos := false
83-
var startLogPos uint64
84-
var endLogPos uint64
97+
// automaton step: accept wither beginning of new entry, or beginning of new statement
98+
func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) {
99+
onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
100+
startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64)
85101

86-
binlogEntry := &BinlogEntry{}
102+
if previousEndLogPos != 0 && startLogPos != previousEndLogPos {
103+
return InvalidState, binlogEntry, fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, previousEndLogPos)
104+
}
105+
nextBinlogEntry = binlogEntry
106+
if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" {
107+
// Current entry is already a true entry, with startpos and with statement
108+
nextBinlogEntry = NewBinlogEntry()
109+
}
87110

88-
for scanner.Scan() {
89-
line := scanner.Text()
111+
nextBinlogEntry.LogPos = startLogPos
112+
return ExpectEndLogPosState, nextBinlogEntry, nil
113+
}
90114

91-
onStartEntry := func(submatch []string) error {
92-
startLogPos, _ = strconv.ParseUint(submatch[1], 10, 64)
115+
onStatementEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
116+
nextBinlogEntry = binlogEntry
117+
if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" {
118+
// Current entry is already a true entry, with startpos and with statement
119+
nextBinlogEntry = binlogEntry.Duplicate()
120+
}
93121

94-
if endLogPos != 0 && startLogPos != endLogPos {
95-
return fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, endLogPos)
96-
}
97-
// We are entering a new entry, let's push the previous one
98-
if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" {
99-
entries = append(entries, binlogEntry)
100-
log.Debugf("entry: %+v", *binlogEntry)
101-
binlogEntry = &BinlogEntry{}
102-
}
122+
nextBinlogEntry.StatementType = strings.Split(submatch[1], " ")[0]
123+
nextBinlogEntry.DatabaseName = submatch[2]
124+
nextBinlogEntry.TableName = submatch[3]
103125

104-
//log.Debugf(line)
105-
binlogEntry.LogPos = startLogPos
106-
// Next iteration we will read the end_log_pos
107-
expectEndLogPos = true
126+
return ExpectTokenState, nextBinlogEntry, nil
127+
}
108128

109-
return nil
129+
onPositionalColumn := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
130+
columnIndex, _ := strconv.ParseUint(submatch[1], 10, 64)
131+
if _, found := binlogEntry.PositionalColumns[columnIndex]; found {
132+
return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.StatementType)
110133
}
111-
if expectEndLogPos {
112-
submatch := endLogPosRegexp.FindStringSubmatch(line)
113-
if len(submatch) <= 1 {
114-
return entries, log.Errorf("Expected to find end_log_pos following pos %+v", startLogPos)
115-
}
116-
endLogPos, _ = strconv.ParseUint(submatch[1], 10, 64)
134+
columnValue := submatch[2]
135+
columnValue = strings.TrimPrefix(columnValue, "'")
136+
columnValue = strings.TrimSuffix(columnValue, "'")
137+
binlogEntry.PositionalColumns[columnIndex] = columnValue
117138

118-
binlogEntry.EndLogPos = endLogPos
119-
expectEndLogPos = false
120-
} else if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 {
121-
if err := onStartEntry(submatch); err != nil {
122-
return entries, log.Errore(err)
123-
}
124-
} else if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 {
125-
if err := onStartEntry(submatch); err != nil {
126-
return entries, log.Errore(err)
127-
}
128-
} else if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 {
129-
binlogEntry.StatementType = strings.Split(submatch[1], " ")[0]
130-
binlogEntry.DatabaseName = submatch[2]
131-
binlogEntry.TableName = submatch[3]
132-
}
139+
return SearchForStartPosOrStatementState, binlogEntry, nil
140+
}
141+
142+
line := scanner.Text()
143+
if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 {
144+
return onStartEntry(submatch)
145+
}
146+
if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 {
147+
return onStartEntry(submatch)
148+
}
149+
if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 {
150+
return onStatementEntry(submatch)
151+
}
152+
if submatch := positionalColumnRegexp.FindStringSubmatch(line); len(submatch) > 1 {
153+
return onPositionalColumn(submatch)
154+
}
155+
// Haven't found a match
156+
return SearchForStartPosOrStatementState, binlogEntry, nil
157+
}
158+
159+
// automaton step: expect an end_log_pos line`
160+
func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) {
161+
line := scanner.Text()
162+
163+
submatch := endLogPosRegexp.FindStringSubmatch(line)
164+
if len(submatch) > 1 {
165+
binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64)
166+
return SearchForStartPosOrStatementState, nil
167+
}
168+
return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos)
169+
}
133170

171+
// automaton step: a not-strictly-required but good-to-have-around validation that
172+
// we see an expected token following a statement
173+
func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) {
174+
line := scanner.Text()
175+
if submatch := tokenRegxp.FindStringSubmatch(line); len(submatch) > 1 {
176+
return SearchForStartPosOrStatementState, nil
134177
}
135-
if binlogEntry.LogPos != 0 {
178+
return InvalidState, fmt.Errorf("Expected to find token following pos %+v", binlogEntry.LogPos)
179+
}
180+
181+
// parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS`
182+
// It issues an automaton / state machine to do its thang.
183+
func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) {
184+
binlogEntry := NewBinlogEntry()
185+
var state BinlogEntryState = SearchForStartPosOrStatementState
186+
var endLogPos uint64
187+
188+
appendBinlogEntry := func() {
189+
if binlogEntry.LogPos == 0 {
190+
return
191+
}
192+
if binlogEntry.StatementType == "" {
193+
return
194+
}
136195
entries = append(entries, binlogEntry)
137196
log.Debugf("entry: %+v", *binlogEntry)
197+
fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.StatementType, binlogEntry.DatabaseName, binlogEntry.TableName))
198+
}
199+
for scanner.Scan() {
200+
switch state {
201+
case SearchForStartPosOrStatementState:
202+
{
203+
var nextBinlogEntry *BinlogEntry
204+
state, nextBinlogEntry, err = searchForStartPosOrStatement(scanner, binlogEntry, endLogPos)
205+
if nextBinlogEntry != binlogEntry {
206+
appendBinlogEntry()
207+
binlogEntry = nextBinlogEntry
208+
}
209+
}
210+
case ExpectEndLogPosState:
211+
{
212+
state, err = expectEndLogPos(scanner, binlogEntry)
213+
}
214+
case ExpectTokenState:
215+
{
216+
state, err = expectToken(scanner, binlogEntry)
217+
}
218+
default:
219+
{
220+
err = fmt.Errorf("Unexpected state %+v", state)
221+
}
222+
}
223+
if err != nil {
224+
return entries, log.Errore(err)
225+
}
138226
}
227+
appendBinlogEntry()
139228
return entries, err
140229
}

go/binlog/mysqlbinlog_reader_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
Copyright 2016 GitHub Inc.
3+
See https://github.com/github/gh-osc/blob/master/LICENSE
4+
*/
5+
6+
package binlog
7+
8+
import (
9+
"bufio"
10+
"os"
11+
"testing"
12+
13+
"github.com/outbrain/golib/log"
14+
test "github.com/outbrain/golib/tests"
15+
)
16+
17+
func init() {
18+
log.SetLevel(log.ERROR)
19+
}
20+
21+
func TestRBRSample0(t *testing.T) {
22+
testFile, err := os.Open("testdata/rbr-sample-0.txt")
23+
test.S(t).ExpectNil(err)
24+
defer testFile.Close()
25+
26+
scanner := bufio.NewScanner(testFile)
27+
entries, err := parseEntries(scanner)
28+
test.S(t).ExpectNil(err)
29+
30+
test.S(t).ExpectEquals(len(entries), 17)
31+
test.S(t).ExpectEquals(entries[0].DatabaseName, "test")
32+
test.S(t).ExpectEquals(entries[0].TableName, "samplet")
33+
test.S(t).ExpectEquals(entries[0].StatementType, "INSERT")
34+
test.S(t).ExpectEquals(entries[1].StatementType, "INSERT")
35+
test.S(t).ExpectEquals(entries[2].StatementType, "INSERT")
36+
test.S(t).ExpectEquals(entries[3].StatementType, "INSERT")
37+
test.S(t).ExpectEquals(entries[4].StatementType, "INSERT")
38+
test.S(t).ExpectEquals(entries[5].StatementType, "INSERT")
39+
test.S(t).ExpectEquals(entries[6].StatementType, "UPDATE")
40+
test.S(t).ExpectEquals(entries[7].StatementType, "DELETE")
41+
test.S(t).ExpectEquals(entries[8].StatementType, "UPDATE")
42+
test.S(t).ExpectEquals(entries[9].StatementType, "INSERT")
43+
test.S(t).ExpectEquals(entries[10].StatementType, "INSERT")
44+
test.S(t).ExpectEquals(entries[11].StatementType, "DELETE")
45+
test.S(t).ExpectEquals(entries[12].StatementType, "DELETE")
46+
test.S(t).ExpectEquals(entries[13].StatementType, "INSERT")
47+
test.S(t).ExpectEquals(entries[14].StatementType, "UPDATE")
48+
test.S(t).ExpectEquals(entries[15].StatementType, "DELETE")
49+
test.S(t).ExpectEquals(entries[16].StatementType, "INSERT")
50+
}

0 commit comments

Comments
 (0)