Skip to content

Commit f0738d1

Browse files
authored
VReplication: Ensure proper handling of keyspace/database names with dashes (#18762)
Signed-off-by: Matt Lord <[email protected]>
1 parent 4101355 commit f0738d1

24 files changed

+667
-622
lines changed

go/cmd/vtctldclient/command/vreplication/lookupvindex/lookupvindex.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"vitess.io/vitess/go/cmd/vtctldclient/cli"
2828
"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
29+
"vitess.io/vitess/go/sqlescape"
2930

3031
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
3132
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
@@ -50,7 +51,7 @@ var (
5051
}
5152

5253
baseOptions = struct {
53-
// This is where the lookup table and VReplicaiton workflow
54+
// This is where the lookup table and VReplication workflow
5455
// will be created.
5556
TableKeyspace string
5657
// This will be the name of the Lookup Vindex and the name
@@ -133,12 +134,20 @@ var (
133134
if !strings.Contains(createOptions.Type, "lookup") {
134135
return fmt.Errorf("vindex type must be a lookup vindex")
135136
}
137+
escapedTableKeyspace, err := sqlescape.EnsureEscaped(baseOptions.TableKeyspace)
138+
if err != nil {
139+
return fmt.Errorf("invalid table keyspace (%s): %v", baseOptions.TableKeyspace, err)
140+
}
141+
escapedTableName, err := sqlescape.EnsureEscaped(createOptions.TableName)
142+
if err != nil {
143+
return fmt.Errorf("invalid table name (%s): %v", createOptions.TableName, err)
144+
}
136145
baseOptions.Vschema = &vschemapb.Keyspace{
137146
Vindexes: map[string]*vschemapb.Vindex{
138147
baseOptions.Name: {
139148
Type: createOptions.Type,
140149
Params: map[string]string{
141-
"table": baseOptions.TableKeyspace + "." + createOptions.TableName,
150+
"table": escapedTableKeyspace + "." + escapedTableName,
142151
"from": strings.Join(createOptions.TableOwnerColumns, ","),
143152
"to": "keyspace_id",
144153
"ignore_nulls": fmt.Sprintf("%t", createOptions.IgnoreNulls),
@@ -204,10 +213,18 @@ var (
204213
return fmt.Errorf("%s is not a lookup vindex type", vindex.LookupVindexType)
205214
}
206215

216+
escapedTableKeyspace, err := sqlescape.EnsureEscaped(baseOptions.TableKeyspace)
217+
if err != nil {
218+
return fmt.Errorf("invalid table keyspace (%s): %v", baseOptions.TableKeyspace, err)
219+
}
220+
escapedTableName, err := sqlescape.EnsureEscaped(createOptions.TableName)
221+
if err != nil {
222+
return fmt.Errorf("invalid table name (%s): %v", vindex.TableName, err)
223+
}
207224
vindexes[vindexName] = &vschemapb.Vindex{
208225
Type: vindex.LookupVindexType,
209226
Params: map[string]string{
210-
"table": baseOptions.TableKeyspace + "." + vindex.TableName,
227+
"table": escapedTableKeyspace + "." + escapedTableName,
211228
"from": strings.Join(vindex.TableOwnerColumns, ","),
212229
"to": "keyspace_id",
213230
"ignore_nulls": fmt.Sprintf("%t", vindex.IgnoreNulls),

go/test/endtoend/vreplication/cluster_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ func getClusterOptions(opts *clusterOptions) *clusterOptions {
360360
opts = &clusterOptions{}
361361
}
362362
if opts.cells == nil {
363-
opts.cells = []string{"zone1"}
363+
opts.cells = []string{defaultCellName}
364364
}
365365
if opts.clusterConfig == nil {
366366
opts.clusterConfig = mainClusterConfig

go/test/endtoend/vreplication/config_test.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -431,44 +431,44 @@ create table ukTable (id1 int not null, id2 int not null, name varchar(20), uniq
431431
}
432432
}
433433
`
434-
materializeProductSpec = `
434+
materializeProductSpec = fmt.Sprintf(`
435435
{
436436
"workflow": "cproduct",
437-
"source_keyspace": "product",
438-
"target_keyspace": "customer",
437+
"source_keyspace": "%s",
438+
"target_keyspace": "%s",
439439
"table_settings": [{
440440
"target_table": "cproduct",
441441
"source_expression": "select * from product",
442442
"create_ddl": "create table cproduct(pid bigint, description varchar(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4"
443443
}]
444444
}
445-
`
445+
`, defaultSourceKs, defaultTargetKs)
446446

447-
materializeCustomerNameSpec = `
447+
materializeCustomerNameSpec = fmt.Sprintf(`
448448
{
449449
"workflow": "customer_name",
450-
"source_keyspace": "customer",
451-
"target_keyspace": "customer",
450+
"source_keyspace": "%s",
451+
"target_keyspace": "%s",
452452
"table_settings": [{
453453
"target_table": "customer_name",
454454
"source_expression": "select cid, name from customer",
455455
"create_ddl": "create table if not exists customer_name (cid bigint not null, name varchar(128), primary key(cid), key(name))"
456456
}]
457457
}
458-
`
458+
`, defaultTargetKs, defaultTargetKs)
459459

460-
materializeCustomerTypeSpec = `
460+
materializeCustomerTypeSpec = fmt.Sprintf(`
461461
{
462462
"workflow": "enterprise_customer",
463-
"source_keyspace": "customer",
464-
"target_keyspace": "customer",
463+
"source_keyspace": "%s",
464+
"target_keyspace": "%s",
465465
"table_settings": [{
466466
"target_table": "enterprise_customer",
467467
"source_expression": "select cid, name, typ from customer where typ = 'enterprise'",
468468
"create_ddl": "create table if not exists enterprise_customer (cid bigint not null, name varchar(128), typ varchar(64), primary key(cid), key(typ))"
469469
}]
470470
}
471-
`
471+
`, defaultTargetKs, defaultTargetKs)
472472

473473
merchantOrdersVSchema = `
474474
{
@@ -512,31 +512,31 @@ create table ukTable (id1 int not null, id2 int not null, name varchar(20), uniq
512512
`
513513

514514
// the merchant-type keyspace allows us to test keyspace names with special characters in them (dash)
515-
materializeMerchantOrdersSpec = `
515+
materializeMerchantOrdersSpec = fmt.Sprintf(`
516516
{
517517
"workflow": "morders",
518-
"source_keyspace": "customer",
518+
"source_keyspace": "%s",
519519
"target_keyspace": "merchant-type",
520520
"table_settings": [{
521521
"target_table": "morders",
522522
"source_expression": "select oid, cid, mname, pid, price, qty, total from orders",
523523
"create_ddl": "create table morders(oid int, cid int, mname varchar(128), pid int, price int, qty int, total int, total2 int as (10 * total), primary key(oid)) CHARSET=utf8"
524524
}]
525525
}
526-
`
526+
`, defaultTargetKs)
527527

528-
materializeMerchantSalesSpec = `
528+
materializeMerchantSalesSpec = fmt.Sprintf(`
529529
{
530530
"workflow": "msales",
531-
"source_keyspace": "customer",
531+
"source_keyspace": "%s",
532532
"target_keyspace": "merchant-type",
533533
"table_settings": [{
534534
"target_table": "msales",
535535
"source_expression": "select mname as merchant_name, count(*) as kount, sum(price) as amount from orders group by merchant_name",
536536
"create_ddl": "create table msales(merchant_name varchar(128), kount int, amount int, primary key(merchant_name)) CHARSET=utf8"
537537
}]
538538
}
539-
`
539+
`, defaultTargetKs)
540540

541541
materializeSalesVSchema = `
542542
{
@@ -552,30 +552,30 @@ create table ukTable (id1 int not null, id2 int not null, name varchar(20), uniq
552552
}
553553
}
554554
`
555-
materializeSalesSpec = `
555+
materializeSalesSpec = fmt.Sprintf(`
556556
{
557557
"workflow": "sales",
558-
"source_keyspace": "customer",
559-
"target_keyspace": "product",
558+
"source_keyspace": "%s",
559+
"target_keyspace": "%s",
560560
"table_settings": [{
561561
"target_Table": "sales",
562562
"source_expression": "select pid, count(*) as kount, sum(price) as amount from orders group by pid",
563563
"create_ddl": "create table sales(pid int, kount int, amount int, primary key(pid)) CHARSET=utf8"
564564
}]
565565
}
566-
`
567-
materializeRollupSpec = `
566+
`, defaultTargetKs, defaultSourceKs)
567+
materializeRollupSpec = fmt.Sprintf(`
568568
{
569569
"workflow": "rollup",
570-
"source_keyspace": "product",
571-
"target_keyspace": "product",
570+
"source_keyspace": "%s",
571+
"target_keyspace": "%s",
572572
"table_settings": [{
573573
"target_table": "rollup",
574574
"source_expression": "select 'total' as rollupname, count(*) as kount from product group by rollupname",
575575
"create_ddl": "create table rollup(rollupname varchar(100), kount int, primary key (rollupname)) CHARSET=utf8mb4"
576576
}]
577577
}
578-
`
578+
`, defaultSourceKs, defaultSourceKs)
579579
initialExternalSchema = `
580580
create table review(rid int, pid int, review varbinary(128), primary key(rid));
581581
create table rating(gid int, pid int, rating int, primary key(gid));

go/test/endtoend/vreplication/fk_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func TestFKWorkflow(t *testing.T) {
5858
defer vc.TearDown()
5959

6060
cell := vc.Cells[cellName]
61-
vc.AddKeyspace(t, []*Cell{cell}, sourceKeyspace, shardName, initialFKSourceVSchema, initialFKSchema, 0, 0, 100, sourceKsOpts)
61+
vc.AddKeyspace(t, []*Cell{cell}, sourceKeyspace, shardName, initialFKSourceVSchema, initialFKSchema, 0, 0, 100, defaultSourceKsOpts)
6262

6363
verifyClusterHealth(t, vc)
6464
insertInitialFKData(t)
@@ -82,7 +82,7 @@ func TestFKWorkflow(t *testing.T) {
8282

8383
targetKeyspace := "fktarget"
8484
targetTabletId := 200
85-
vc.AddKeyspace(t, []*Cell{cell}, targetKeyspace, shardName, initialFKTargetVSchema, "", 0, 0, targetTabletId, sourceKsOpts)
85+
vc.AddKeyspace(t, []*Cell{cell}, targetKeyspace, shardName, initialFKTargetVSchema, "", 0, 0, targetTabletId, defaultSourceKsOpts)
8686

8787
testFKCancel(t, vc)
8888

go/test/endtoend/vreplication/helper_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,11 @@ func waitForRowCountInTablet(t *testing.T, vttablet *cluster.VttabletProcess, da
288288
// Note: you specify the number of values that you want to reserve
289289
// and you get back the max value reserved.
290290
func waitForSequenceValue(t *testing.T, conn *mysql.Conn, database, sequence string, numVals int) int64 {
291-
query := fmt.Sprintf("select next %d values from %s.%s", numVals, database, sequence)
291+
escapedDB, err := sqlescape.EnsureEscaped(database)
292+
require.NoError(t, err)
293+
escapedSeq, err := sqlescape.EnsureEscaped(sequence)
294+
require.NoError(t, err)
295+
query := fmt.Sprintf("select next %d values from %s.%s", numVals, escapedDB, escapedSeq)
292296
timer := time.NewTimer(defaultTimeout)
293297
defer timer.Stop()
294298
for {
@@ -545,7 +549,7 @@ func validateDryRunResults(t *testing.T, output string, want []string) {
545549
}
546550
if !match {
547551
fail = true
548-
require.Fail(t, "invlaid dry run results", "want %s, got %s\n", w, gotDryRun[i])
552+
require.Fail(t, "invalid dry run results", "want %s, got %s\n", w, gotDryRun[i])
549553
}
550554
}
551555
if fail {
@@ -646,11 +650,11 @@ func getDebugVar(t *testing.T, port int, varPath []string) (string, error) {
646650
return string(val), nil
647651
}
648652

649-
func confirmWorkflowHasCopiedNoData(t *testing.T, targetKS, workflow string) {
653+
func confirmWorkflowHasCopiedNoData(t *testing.T, defaultTargetKs, workflow string) {
650654
timer := time.NewTimer(defaultTimeout)
651655
defer timer.Stop()
652656
for {
653-
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", workflow, "--compact", "--include-logs=false")
657+
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", defaultTargetKs, "show", "--workflow", workflow, "--compact", "--include-logs=false")
654658
require.NoError(t, err, output)
655659
streams := gjson.Get(output, "workflows.0.shard_streams.*.streams")
656660
streams.ForEach(func(streamId, stream gjson.Result) bool { // For each stream
@@ -662,7 +666,7 @@ func confirmWorkflowHasCopiedNoData(t *testing.T, targetKS, workflow string) {
662666
(pos.Exists() && pos.String() != "") {
663667
require.FailNowf(t, "Unexpected data copied in workflow",
664668
"The MoveTables workflow %q copied data in less than %s when it should have been waiting. Show output: %s",
665-
ksWorkflow, defaultTimeout, output)
669+
defaultKsWorkflow, defaultTimeout, output)
666670
}
667671
return true
668672
})

go/test/endtoend/vreplication/initial_data_test.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ func insertInitialData(t *testing.T) {
3131
defer closeConn()
3232
log.Infof("Inserting initial data")
3333
lines, _ := os.ReadFile("unsharded_init_data.sql")
34-
execMultipleQueries(t, vtgateConn, "product:0", string(lines))
35-
execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq(id, next_id, cache) values(0, 100, 100);")
36-
execVtgateQuery(t, vtgateConn, "product:0", "insert into order_seq(id, next_id, cache) values(0, 100, 100);")
37-
execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq2(id, next_id, cache) values(0, 100, 100);")
34+
execMultipleQueries(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), string(lines))
35+
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into customer_seq(id, next_id, cache) values(0, 100, 100);")
36+
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into order_seq(id, next_id, cache) values(0, 100, 100);")
37+
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into customer_seq2(id, next_id, cache) values(0, 100, 100);")
3838
log.Infof("Done inserting initial data")
3939

40-
waitForRowCount(t, vtgateConn, "product:0", "product", 2)
41-
waitForRowCount(t, vtgateConn, "product:0", "customer", 3)
42-
waitForQueryResult(t, vtgateConn, "product:0", "select * from merchant",
40+
waitForRowCount(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "product", 2)
41+
waitForRowCount(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "customer", 3)
42+
waitForQueryResult(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "select * from merchant",
4343
`[[VARCHAR("Monoprice") VARCHAR("eléctronics")] [VARCHAR("newegg") VARCHAR("elec†ronics")]]`)
4444

4545
insertJSONValues(t)
@@ -52,12 +52,12 @@ func insertJSONValues(t *testing.T) {
5252
// insert null value combinations
5353
vtgateConn, closeConn := getVTGateConn()
5454
defer closeConn()
55-
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(1, \"{}\")")
56-
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j3) values(2, \"{}\", \"{}\")")
57-
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j2, j3) values(3, \"{}\", \"{}\")")
58-
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j1, j2, j3) values(4, NULL, 'null', '\"null\"')")
59-
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(5, JSON_QUOTE('null'))")
60-
execVtgateQuery(t, vtgateConn, "product:0", "insert into json_tbl(id, j3) values(6, '{}')")
55+
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j3) values(1, \"{}\")")
56+
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j1, j3) values(2, \"{}\", \"{}\")")
57+
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j2, j3) values(3, \"{}\", \"{}\")")
58+
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j1, j2, j3) values(4, NULL, 'null', '\"null\"')")
59+
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j3) values(5, JSON_QUOTE('null'))")
60+
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), "insert into json_tbl(id, j3) values(6, '{}')")
6161

6262
id := 8 // 6 inserted above and one after copy phase is done
6363

@@ -68,7 +68,7 @@ func insertJSONValues(t *testing.T) {
6868
j1 := rand.IntN(numJsonValues)
6969
j2 := rand.IntN(numJsonValues)
7070
query := fmt.Sprintf(q, id, jsonValues[j1], jsonValues[j2])
71-
execVtgateQuery(t, vtgateConn, "product:0", query)
71+
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), query)
7272
}
7373
}
7474

@@ -82,7 +82,7 @@ func insertMoreCustomers(t *testing.T, numCustomers int) {
8282
// that we reserved.
8383
vtgateConn, closeConn := getVTGateConn()
8484
defer closeConn()
85-
maxID := waitForSequenceValue(t, vtgateConn, "product", "customer_seq", numCustomers)
85+
maxID := waitForSequenceValue(t, vtgateConn, defaultSourceKs, "customer_seq", numCustomers)
8686
// So we need to calculate the first value we reserved
8787
// from the max.
8888
cid := maxID - int64(numCustomers)
@@ -97,28 +97,28 @@ func insertMoreCustomers(t *testing.T, numCustomers int) {
9797
}
9898
cid++
9999
}
100-
execVtgateQuery(t, vtgateConn, "customer", sql)
100+
execVtgateQuery(t, vtgateConn, defaultTargetKs, sql)
101101
}
102102

103103
func insertMoreProducts(t *testing.T) {
104104
vtgateConn, closeConn := getVTGateConn()
105105
defer closeConn()
106106
sql := "insert into product(pid, description) values(3, 'cpu'),(4, 'camera'),(5, 'mouse');"
107-
execVtgateQuery(t, vtgateConn, "product", sql)
107+
execVtgateQuery(t, vtgateConn, defaultSourceKs, sql)
108108
}
109109

110110
func insertMoreProductsForSourceThrottler(t *testing.T) {
111111
vtgateConn, closeConn := getVTGateConn()
112112
defer closeConn()
113113
sql := "insert into product(pid, description) values(103, 'new-cpu'),(104, 'new-camera'),(105, 'new-mouse');"
114-
execVtgateQuery(t, vtgateConn, "product", sql)
114+
execVtgateQuery(t, vtgateConn, defaultSourceKs, sql)
115115
}
116116

117117
func insertMoreProductsForTargetThrottler(t *testing.T) {
118118
vtgateConn, closeConn := getVTGateConn()
119119
defer closeConn()
120120
sql := "insert into product(pid, description) values(203, 'new-cpu'),(204, 'new-camera'),(205, 'new-mouse');"
121-
execVtgateQuery(t, vtgateConn, "product", sql)
121+
execVtgateQuery(t, vtgateConn, defaultSourceKs, sql)
122122
}
123123

124124
var blobTableQueries = []string{
@@ -137,6 +137,6 @@ func insertIntoBlobTable(t *testing.T) {
137137
vtgateConn, closeConn := getVTGateConn()
138138
defer closeConn()
139139
for _, query := range blobTableQueries {
140-
execVtgateQuery(t, vtgateConn, "product:0", query)
140+
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), query)
141141
}
142142
}

0 commit comments

Comments
 (0)