Skip to content

Commit 20a74d5

Browse files
author
Shlomi Noach
committed
Merge pull request #6 from github/experimenting-go-mysql
merging so I can use this on other branches
2 parents e3210a9 + 8f3d13e commit 20a74d5

File tree

7 files changed

+181
-3
lines changed

7 files changed

+181
-3
lines changed

go/binlog/gomysql_reader.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
Copyright 2016 GitHub Inc.
3+
See https://github.com/github/gh-osc/blob/master/LICENSE
4+
*/
5+
6+
package binlog
7+
8+
import (
9+
"fmt"
10+
"os"
11+
"reflect"
12+
"strings"
13+
14+
"github.com/github/gh-osc/go/mysql"
15+
"github.com/outbrain/golib/log"
16+
gomysql "github.com/siddontang/go-mysql/mysql"
17+
"github.com/siddontang/go-mysql/replication"
18+
)
19+
20+
var ()
21+
22+
const (
23+
serverId = 99999
24+
)
25+
26+
type GoMySQLReader struct {
27+
connectionConfig *mysql.ConnectionConfig
28+
binlogSyncer *replication.BinlogSyncer
29+
}
30+
31+
func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) {
32+
binlogReader = &GoMySQLReader{
33+
connectionConfig: connectionConfig,
34+
}
35+
binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql")
36+
37+
// Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password
38+
err = binlogReader.binlogSyncer.RegisterSlave(connectionConfig.Hostname, uint16(connectionConfig.Port), connectionConfig.User, connectionConfig.Password)
39+
if err != nil {
40+
return binlogReader, err
41+
}
42+
43+
return binlogReader, err
44+
}
45+
46+
func (this *GoMySQLReader) isDMLEvent(event *replication.BinlogEvent) bool {
47+
eventType := event.Header.EventType.String()
48+
if strings.HasPrefix(eventType, "WriteRows") {
49+
return true
50+
}
51+
if strings.HasPrefix(eventType, "UpdateRows") {
52+
return true
53+
}
54+
if strings.HasPrefix(eventType, "DeleteRows") {
55+
return true
56+
}
57+
return false
58+
}
59+
60+
// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility
61+
func (this *GoMySQLReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) {
62+
// Start sync with sepcified binlog file and position
63+
streamer, err := this.binlogSyncer.StartSync(gomysql.Position{logFile, uint32(startPos)})
64+
if err != nil {
65+
return entries, err
66+
}
67+
68+
for {
69+
ev, err := streamer.GetEvent()
70+
if err != nil {
71+
return entries, err
72+
}
73+
if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
74+
if true {
75+
fmt.Println(ev.Header.EventType)
76+
fmt.Println(len(rowsEvent.Rows))
77+
78+
for _, rows := range rowsEvent.Rows {
79+
for j, d := range rows {
80+
if _, ok := d.([]byte); ok {
81+
fmt.Print(fmt.Sprintf("yesbin %d:%q, %+v\n", j, d, reflect.TypeOf(d)))
82+
} else {
83+
fmt.Print(fmt.Sprintf("notbin %d:%#v, %+v\n", j, d, reflect.TypeOf(d)))
84+
}
85+
}
86+
fmt.Println("---")
87+
}
88+
} else {
89+
ev.Dump(os.Stdout)
90+
}
91+
}
92+
}
93+
log.Debugf("done")
94+
return entries, err
95+
}

go/binlog/testdata/mysql-bin.000066

3.6 KB
Binary file not shown.

go/binlog/testdata/mysql-bin.000070

4.16 KB
Binary file not shown.

go/binlog/testdata/rbr-sample-1.txt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
these are the statements that were used to execute the RBR log:
3+
4+
drop table if exists samplet;
5+
create table samplet(id int primary key, license int, name varchar(64), unique key license_uidx(license)) engine=innodb;
6+
insert into samplet values (1,1,'a');
7+
insert into samplet values (2,2,'extended'),(3,3,'extended');
8+
begin;
9+
insert into samplet values (4,4,'transaction');
10+
insert into samplet values (5,5,'transaction');
11+
insert into samplet values (6,6,'transaction');
12+
commit;
13+
update samplet set name='update' where id=5;
14+
replace into samplet values (2,4,'replaced 2,4');
15+
insert into samplet values (7,7,'7');
16+
insert into samplet values (8,8,'8');
17+
delete from samplet where id >= 7;
18+
insert into samplet values (9,9,'9');
19+
begin;
20+
update samplet set name='update 9' where id=9;
21+
delete from samplet where license=3;
22+
insert into samplet values (10,10,'10');
23+
commit;
24+
update samplet set name='update 5,6' where id in (5,6);
25+
begin;
26+
delete from samplet where id=5;
27+
rollback;
28+
*/

go/binlog/testdata/rbr-sample-2.txt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
drop table if exists samplet;
2+
create table samplet(id int primary key, license int, name varchar(64), b tinyblob, unique key license_uidx(license)) engine=innodb;
3+
insert into samplet (id, license, name) values (1,1,'a');
4+
insert into samplet (id, license, name) values (2,2,'extended'),(3,3,'extended');
5+
begin;
6+
insert into samplet (id, license, name) values (4,4,'transaction');
7+
insert into samplet (id, license, name) values (5,5,'transaction');
8+
insert into samplet (id, license, name) values (6,6,'transaction');
9+
commit;
10+
update samplet set name='update' where id=5;
11+
replace into samplet (id, license, name) values (2,4,'replaced 2,4');
12+
insert into samplet (id, license, name, b) values (7,7,'7', x'89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200AECE1CE90000000467414D410000B18F0BFC6105000000097048597300000EC300000EC301C76FA8640000001E49444154384F6350DAE843126220493550F1A80662426C349406472801006AC91F1040F796BD0000000049454E44AE426082');
13+
insert into samplet (id, license, name) values (8,8,'8');
14+
delete from samplet where id >= 7;
15+
insert into samplet (id, license, name) values (9,9,'9');
16+
begin;
17+
update samplet set name='update 9', b=x'89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200AECE1CE90000000467414D410000B18F0BFC6105000000097048597300000EC300000EC301C76FA8640000001E49444154384F6350DAE843126220493550F1A80662426C349406472801006AC91F1040F796BD0000000049454E44AE426082' where id=9;
18+
update samplet set name='update 9b' where id=9;
19+
delete from samplet where license=3;
20+
insert into samplet (id, license, name) values (10,10,'10');
21+
commit;
22+
update samplet set name='update 5,6' where id in (5,6);
23+
begin;
24+
delete from samplet where id=5;
25+
rollback;

go/cmd/gh-osc/main.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,24 @@ import (
1111
"os"
1212

1313
"github.com/github/gh-osc/go/binlog"
14+
"github.com/github/gh-osc/go/mysql"
1415
"github.com/outbrain/golib/log"
1516
)
1617

1718
// main is the application's entry point. It will either spawn a CLI or HTTP itnerfaces.
1819
func main() {
19-
mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)")
20-
mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)")
20+
var connectionConfig mysql.ConnectionConfig
21+
22+
// mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)")
23+
// mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)")
2124
internalExperiment := flag.Bool("internal-experiment", false, "issue an internal experiment")
2225
binlogFile := flag.String("binlog-file", "", "Name of binary log file")
26+
27+
flag.StringVar(&connectionConfig.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)")
28+
flag.IntVar(&connectionConfig.Port, "port", 3306, "MySQL port (preferably a replica, not the master)")
29+
flag.StringVar(&connectionConfig.User, "user", "root", "MySQL user")
30+
flag.StringVar(&connectionConfig.Password, "password", "", "MySQL password")
31+
2332
quiet := flag.Bool("quiet", false, "quiet")
2433
verbose := flag.Bool("verbose", false, "verbose")
2534
debug := flag.Bool("debug", false, "debug mode (very verbose)")
@@ -51,7 +60,14 @@ func main() {
5160

5261
if *internalExperiment {
5362
log.Debug("starting experiment")
54-
binlogReader := binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir)
63+
var binlogReader binlog.BinlogReader
64+
var err error
65+
66+
//binlogReader = binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir)
67+
binlogReader, err = binlog.NewGoMySQLReader(&connectionConfig)
68+
if err != nil {
69+
log.Fatale(err)
70+
}
5571
binlogReader.ReadEntries(*binlogFile, 0, 0)
5672
}
5773
}

go/mysql/connection.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
Copyright 2016 GitHub Inc.
3+
See https://github.com/github/gh-osc/blob/master/LICENSE
4+
*/
5+
6+
package mysql
7+
8+
// ConnectionConfig is the minimal configuration required to connect to a MySQL server
9+
type ConnectionConfig struct {
10+
Hostname string
11+
Port int
12+
User string
13+
Password string
14+
}

0 commit comments

Comments
 (0)