Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
9b2a04d
Merge pull request #2 from openark/workflow-upload-artifact
shlomi-noach Jul 28, 2020
9ccde4f
Merge pull request #5 from openark/parse-alter-statement
shlomi-noach Jul 29, 2020
b59a8ed
merged conflict
shlomi-noach Aug 2, 2020
6012e80
Merge pull request #8 from openark/ajm188-handle_driver_timeout_error
shlomi-noach Aug 2, 2020
ae22d84
v1.1.0
shlomi-noach Aug 5, 2020
ca0ca5a
Merge remote-tracking branch 'upstream/master' into updates-from-upst…
shlomi-noach Oct 18, 2020
e9f9af2
Merge pull request #11 from openark/updates-from-upstream-2020-10
shlomi-noach Oct 18, 2020
294d43b
WIP: copying AUTO_INCREMENT value to ghost table
shlomi-noach Dec 31, 2020
26f7602
greping for 'expect_table_structure' content
shlomi-noach Dec 31, 2020
75009db
Adding simple test for 'expect_table_structure' scenario
shlomi-noach Dec 31, 2020
eeab264
adding tests for AUTO_INCREMENT value after row deletes. Should initi…
shlomi-noach Dec 31, 2020
2d0281f
clear event beforehand
shlomi-noach Dec 31, 2020
af20211
parsing AUTO_INCREMENT from alter query, reading AUTO_INCREMENT from …
shlomi-noach Dec 31, 2020
31069ae
support GetUint64
shlomi-noach Dec 31, 2020
3d4dfaa
minor update to test
shlomi-noach Dec 31, 2020
63219ab
adding test for user defined AUTO_INCREMENT statement
shlomi-noach Dec 31, 2020
525a80d
Merge branch 'master' into copy-auto-increment
shlomi-noach Jan 5, 2021
ff82140
Merge pull request #12 from openark/copy-auto-increment
shlomi-noach Jan 5, 2021
7202076
Generated column as part of UNIQUE (or PRIMARY) KEY
shlomi-noach Jan 19, 2021
b7b3bfb
skip analysis of generated column data type in unique key
shlomi-noach Jan 19, 2021
253658d
Merge pull request #13 from openark/unique-key-generated-column
shlomi-noach Jan 27, 2021
4a36e24
Merge pull request #14 from ccoffey/cathal/safer_cut_over
shlomi-noach Feb 7, 2021
710c9dd
All MySQL DBs limited to max 3 concurrent/idle connections
shlomi-noach Feb 18, 2021
dea8d54
Merge branch 'master' into limit-mysql-connetions
shlomi-noach Feb 22, 2021
2b5cf78
Merge pull request #15 from openark/limit-mysql-connetions
shlomi-noach Feb 22, 2021
54000ab
hooks: reporting GH_OST_ETA_SECONDS. ETA stored as part of migration …
shlomi-noach Mar 7, 2021
51719a2
GH_OST_ETA_NANOSECONDS
shlomi-noach Mar 7, 2021
76b9c16
N/A denoted by negative value
shlomi-noach Mar 7, 2021
b688c58
ETAUnknown constant
shlomi-noach Mar 7, 2021
33516f4
Merge pull request #17 from openark/hooks-eta-seconds
shlomi-noach Mar 7, 2021
c1bfe94
Convering enum to varchar
shlomi-noach May 2, 2021
9bb2daa
test: not null
shlomi-noach May 2, 2021
939b898
first attempt at setting enum-to-string right
shlomi-noach May 2, 2021
95ee9e2
fix insert query
shlomi-noach May 2, 2021
e80ddb4
store enum values, use when populating
shlomi-noach May 2, 2021
6e5b665
apply EnumValues to mapped column
shlomi-noach May 2, 2021
82bdf06
fix compilation error
shlomi-noach May 2, 2021
91dec1a
Merge branch 'master' into enum-to-varchar
shlomi-noach May 4, 2021
48b0283
gofmt
shlomi-noach May 4, 2021
c90556e
Merge branch 'master' into enum-to-varchar
shlomi-noach May 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
const (
HTTPStatusOK = 200
MaxEventsBatchSize = 1000
ETAUnknown = math.MinInt64
)

var (
Expand Down Expand Up @@ -182,6 +183,7 @@ type MigrationContext struct {
lastHeartbeatOnChangelogMutex *sync.Mutex
CurrentLag int64
currentProgress uint64
etaNanoseonds int64
ThrottleHTTPStatusCode int64
controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64
Expand Down Expand Up @@ -267,6 +269,7 @@ func NewMigrationContext() *MigrationContext {
MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
etaNanoseonds: ETAUnknown,
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
Expand Down Expand Up @@ -474,6 +477,22 @@ func (this *MigrationContext) SetProgressPct(progressPct float64) {
atomic.StoreUint64(&this.currentProgress, math.Float64bits(progressPct))
}

func (this *MigrationContext) GetETADuration() time.Duration {
return time.Duration(atomic.LoadInt64(&this.etaNanoseonds))
}

func (this *MigrationContext) SetETADuration(etaDuration time.Duration) {
atomic.StoreInt64(&this.etaNanoseonds, etaDuration.Nanoseconds())
}

func (this *MigrationContext) GetETASeconds() int64 {
nano := atomic.LoadInt64(&this.etaNanoseonds)
if nano < 0 {
return ETAUnknown
}
return nano / int64(time.Second)
}

// math.Float64bits([f=0..100])

// GetTotalRowsCopied returns the accurate number of rows being copied (affected)
Expand Down
1 change: 1 addition & 0 deletions go/logic/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct()))
env = append(env, fmt.Sprintf("GH_OST_ETA_SECONDS=%d", this.migrationContext.GetETASeconds()))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken))
Expand Down
5 changes: 5 additions & 0 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
if column.Name == mappedColumn.Name && column.Type == sql.DateTimeColumnType && mappedColumn.Type == sql.TimestampColumnType {
this.migrationContext.MappedSharedColumns.SetConvertDatetimeToTimestamp(column.Name, this.migrationContext.ApplierTimeZone)
}
if column.Name == mappedColumn.Name && column.Type == sql.EnumColumnType && mappedColumn.Charset != "" {
this.migrationContext.MappedSharedColumns.SetEnumToTextConversion(column.Name)
this.migrationContext.MappedSharedColumns.SetEnumValues(column.Name, column.EnumValues)
}
}

for _, column := range this.migrationContext.UniqueKey.Columns.Columns() {
Expand Down Expand Up @@ -590,6 +594,7 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
}
if strings.HasPrefix(columnType, "enum") {
column.Type = sql.EnumColumnType
column.EnumValues = sql.ParseEnumValues(m.GetString("COLUMN_TYPE"))
}
if strings.HasPrefix(columnType, "binary") {
column.Type = sql.BinaryColumnType
Expand Down
19 changes: 14 additions & 5 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,20 +939,29 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
}

var etaSeconds float64 = math.MaxFloat64
eta := "N/A"
var etaDuration = time.Duration(base.ETAUnknown)
if progressPct >= 100.0 {
eta = "due"
etaDuration = 0
} else if progressPct >= 0.1 {
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
if etaSeconds >= 0 {
etaDuration := time.Duration(etaSeconds) * time.Second
eta = base.PrettifyDurationOutput(etaDuration)
etaDuration = time.Duration(etaSeconds) * time.Second
} else {
eta = "due"
etaDuration = 0
}
}
this.migrationContext.SetETADuration(etaDuration)
var eta string
switch etaDuration {
case 0:
eta = "due"
case time.Duration(base.ETAUnknown):
eta = "N/A"
default:
eta = base.PrettifyDurationOutput(etaDuration)
}

state := "migrating"
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {
Expand Down
7 changes: 5 additions & 2 deletions go/logic/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ func (this *Throttler) collectControlReplicasLag() {
dbUri := connectionConfig.GetDBUri("information_schema")

var heartbeatValue string
if db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri); err != nil {
db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri)
if err != nil {
return lag, err
} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
}

if err := db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
return lag, err
}

Expand Down
30 changes: 16 additions & 14 deletions go/mysql/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
"github.com/outbrain/golib/sqlutils"
)

const MaxTableNameLength = 64
const MaxReplicationPasswordLength = 32
const (
MaxTableNameLength = 64
MaxReplicationPasswordLength = 32
MaxDBPoolConnections = 3
)

type ReplicationLagResult struct {
Key InstanceKey
Expand All @@ -39,23 +42,22 @@ func (this *ReplicationLagResult) HasLag() bool {
var knownDBs map[string]*gosql.DB = make(map[string]*gosql.DB)
var knownDBsMutex = &sync.Mutex{}

func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) {
func GetDB(migrationUuid string, mysql_uri string) (db *gosql.DB, exists bool, err error) {
cacheKey := migrationUuid + ":" + mysql_uri

knownDBsMutex.Lock()
defer func() {
knownDBsMutex.Unlock()
}()

var exists bool
if _, exists = knownDBs[cacheKey]; !exists {
if db, err := gosql.Open("mysql", mysql_uri); err == nil {
knownDBs[cacheKey] = db
} else {
return db, exists, err
defer knownDBsMutex.Unlock()

if db, exists = knownDBs[cacheKey]; !exists {
db, err = gosql.Open("mysql", mysql_uri)
if err != nil {
return nil, false, err
}
db.SetMaxOpenConns(MaxDBPoolConnections)
db.SetMaxIdleConns(MaxDBPoolConnections)
knownDBs[cacheKey] = db
}
return knownDBs[cacheKey], exists, nil
return db, exists, nil
}

// GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS
Expand Down
4 changes: 4 additions & 0 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func buildColumnsPreparedValues(columns *ColumnList) []string {
var token string
if column.timezoneConversion != nil {
token = fmt.Sprintf("convert_tz(?, '%s', '%s')", column.timezoneConversion.ToTimezone, "+00:00")
} else if column.enumToTextConversion {
token = fmt.Sprintf("ELT(?, %s)", column.EnumValues)
} else if column.Type == JSONColumnType {
token = "convert(? using utf8mb4)"
} else {
Expand Down Expand Up @@ -108,6 +110,8 @@ func BuildSetPreparedClause(columns *ColumnList) (result string, err error) {
var setToken string
if column.timezoneConversion != nil {
setToken = fmt.Sprintf("%s=convert_tz(?, '%s', '%s')", EscapeName(column.Name), column.timezoneConversion.ToTimezone, "+00:00")
} else if column.enumToTextConversion {
setToken = fmt.Sprintf("%s=ELT(?, %s)", EscapeName(column.Name), column.EnumValues)
} else if column.Type == JSONColumnType {
setToken = fmt.Sprintf("%s=convert(? using utf8mb4)", EscapeName(column.Name))
} else {
Expand Down
8 changes: 8 additions & 0 deletions go/sql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
// ALTER TABLE tbl something
regexp.MustCompile(`(?i)\balter\s+table\s+([\S]+)\s+(.*$)`),
}
enumValuesRegexp = regexp.MustCompile("^enum[(](.*)[)]$")
)

type AlterTableParser struct {
Expand Down Expand Up @@ -205,3 +206,10 @@ func (this *AlterTableParser) HasExplicitTable() bool {
func (this *AlterTableParser) GetAlterStatementOptions() string {
return this.alterStatementOptions
}

func ParseEnumValues(enumColumnType string) string {
if submatch := enumValuesRegexp.FindStringSubmatch(enumColumnType); len(submatch) > 0 {
return submatch[1]
}
return enumColumnType
}
18 changes: 18 additions & 0 deletions go/sql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,21 @@ func TestParseAlterStatementExplicitTable(t *testing.T) {
test.S(t).ExpectTrue(reflect.DeepEqual(parser.alterTokens, []string{"drop column b", "add index idx(i)"}))
}
}

func TestParseEnumValues(t *testing.T) {
{
s := "enum('red','green','blue','orange')"
values := ParseEnumValues(s)
test.S(t).ExpectEquals(values, "'red','green','blue','orange'")
}
{
s := "('red','green','blue','orange')"
values := ParseEnumValues(s)
test.S(t).ExpectEquals(values, "('red','green','blue','orange')")
}
{
s := "zzz"
values := ParseEnumValues(s)
test.S(t).ExpectEquals(values, "zzz")
}
}
27 changes: 20 additions & 7 deletions go/sql/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ type TimezoneConversion struct {
}

type Column struct {
Name string
IsUnsigned bool
Charset string
Type ColumnType

Name string
IsUnsigned bool
Charset string
Type ColumnType
EnumValues string
timezoneConversion *TimezoneConversion
enumToTextConversion bool
// add Octet length for binary type, fix bytes with suffix "00" get clipped in mysql binlog.
// https://github.com/github/gh-ost/issues/909
BinaryOctetLength uint
timezoneConversion *TimezoneConversion
BinaryOctetLength uint
}

func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} {
Expand Down Expand Up @@ -198,6 +199,18 @@ func (this *ColumnList) HasTimezoneConversion(columnName string) bool {
return this.GetColumn(columnName).timezoneConversion != nil
}

func (this *ColumnList) SetEnumToTextConversion(columnName string) {
this.GetColumn(columnName).enumToTextConversion = true
}

func (this *ColumnList) IsEnumToTextConversion(columnName string) bool {
return this.GetColumn(columnName).enumToTextConversion
}

func (this *ColumnList) SetEnumValues(columnName string, enumValues string) {
this.GetColumn(columnName).EnumValues = enumValues
}

func (this *ColumnList) String() string {
return strings.Join(this.Names(), ",")
}
Expand Down
26 changes: 26 additions & 0 deletions localtests/enum-to-varchar/create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
e enum('red', 'green', 'blue', 'orange') null default null collate 'utf8_bin',
primary key(id)
) auto_increment=1;

insert into gh_ost_test values (null, 7, 'red');

drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values (null, 11, 'red');
insert into gh_ost_test values (null, 13, 'green');
insert into gh_ost_test values (null, 17, 'blue');
set @last_insert_id := last_insert_id();
update gh_ost_test set e='orange' where id = @last_insert_id;
end ;;
1 change: 1 addition & 0 deletions localtests/enum-to-varchar/extra_args
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--alter="change e e varchar(32) not null default ''"