Skip to content

Commit 1f49de4

Browse files
Add new force flag to DemotePrimary to force a demotion even when blocked on waiting for semi-sync acks (#18714)
Signed-off-by: Arthur Schreiber <[email protected]> Signed-off-by: Tim Vaillancourt <[email protected]> Co-authored-by: Tim Vaillancourt <[email protected]>
1 parent d0249c7 commit 1f49de4

File tree

28 files changed

+398
-40
lines changed

28 files changed

+398
-40
lines changed

go/test/endtoend/cluster/vttablet_process.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,20 @@ func (vttablet *VttabletProcess) TearDown() error {
419419
return vttablet.TearDownWithTimeout(vttabletStateTimeout)
420420
}
421421

422+
func (vttablet *VttabletProcess) Stop() {
423+
if vttablet.proc == nil || vttablet.exit == nil {
424+
return
425+
}
426+
vttablet.proc.Process.Signal(syscall.SIGSTOP)
427+
}
428+
429+
func (vttablet *VttabletProcess) Resume() {
430+
if vttablet.proc == nil || vttablet.exit == nil {
431+
return
432+
}
433+
vttablet.proc.Process.Signal(syscall.SIGCONT)
434+
}
435+
422436
// Kill shuts down the running vttablet service immediately.
423437
func (vttablet *VttabletProcess) Kill() error {
424438
if vttablet.proc == nil || vttablet.exit == nil {

go/test/endtoend/reparent/emergencyreparent/ers_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@ package emergencyreparent
1919
import (
2020
"context"
2121
"os/exec"
22+
"sync"
2223
"testing"
2324
"time"
2425

26+
"github.com/stretchr/testify/assert"
2527
"github.com/stretchr/testify/require"
2628

2729
"vitess.io/vitess/go/mysql"
2830
"vitess.io/vitess/go/test/endtoend/cluster"
2931
"vitess.io/vitess/go/test/endtoend/reparent/utils"
3032
"vitess.io/vitess/go/vt/log"
3133
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
34+
35+
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
3236
)
3337

3438
func TestTrivialERS(t *testing.T) {
@@ -131,6 +135,131 @@ func TestReparentDownPrimary(t *testing.T) {
131135
utils.ResurrectTablet(ctx, t, clusterInstance, tablets[0])
132136
}
133137

138+
func TestEmergencyReparentWithBlockedPrimary(t *testing.T) {
139+
clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
140+
defer utils.TeardownCluster(clusterInstance)
141+
142+
if clusterInstance.VtTabletMajorVersion < 24 {
143+
t.Skip("Skipping test since `DemotePrimary` on earlier versions does not handle blocked primaries correctly")
144+
}
145+
146+
// start vtgate w/disabled buffering
147+
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
148+
"--enable_buffer=false",
149+
"--query-timeout", "3000")
150+
err := clusterInstance.StartVtgate()
151+
require.NoError(t, err)
152+
153+
ctx, cancel := context.WithCancel(context.Background())
154+
defer cancel()
155+
156+
conn, err := mysql.Connect(ctx, &mysql.ConnParams{
157+
Host: clusterInstance.Hostname,
158+
Port: clusterInstance.VtgateMySQLPort,
159+
})
160+
require.NoError(t, err)
161+
defer conn.Close()
162+
163+
_, err = conn.ExecuteFetch("CREATE TABLE test (id INT PRIMARY KEY, msg VARCHAR(64))", 0, false)
164+
require.NoError(t, err)
165+
166+
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
167+
168+
// Simulate no semi-sync replicas being available by disabling semi-sync on all replicas
169+
for _, tablet := range tablets[1:] {
170+
utils.RunSQL(ctx, t, "STOP REPLICA IO_THREAD", tablet)
171+
172+
// Disable semi-sync on replicas to simulate blocking
173+
semisyncType, err := utils.SemiSyncExtensionLoaded(context.Background(), tablet)
174+
require.NoError(t, err)
175+
switch semisyncType {
176+
case mysql.SemiSyncTypeSource:
177+
utils.RunSQL(context.Background(), t, "SET GLOBAL rpl_semi_sync_replica_enabled = false", tablet)
178+
case mysql.SemiSyncTypeMaster:
179+
utils.RunSQL(context.Background(), t, "SET GLOBAL rpl_semi_sync_slave_enabled = false", tablet)
180+
}
181+
182+
utils.RunSQL(context.Background(), t, "START REPLICA IO_THREAD", tablet)
183+
}
184+
185+
// Try performing a write and ensure that it blocks.
186+
writeSQL := `insert into test(id, msg) values (1, 'test 1')`
187+
wg := sync.WaitGroup{}
188+
wg.Add(1)
189+
go func() {
190+
defer wg.Done()
191+
192+
// Attempt writing via vtgate against the primary. This should block (because there's no replicas to ack the semi-sync),
193+
// and fail on the vtgate query timeout. Async replicas will still receive this write (probably), because it is written
194+
// to the PRIMARY binlog even when no ackers exist. This means we need to disable the vtgate buffer (above), because it
195+
// will attempt the write on the promoted, unblocked primary - and this will hit a dupe key error.
196+
_, err := conn.ExecuteFetch(writeSQL, 0, false)
197+
require.ErrorContains(t, err, "context deadline exceeded (errno 1317) (sqlstate 70100) during query: "+writeSQL)
198+
199+
// Verify vtgate really processed the insert in case something unrelated caused the deadline exceeded.
200+
vtgateVars := clusterInstance.VtgateProcess.GetVars()
201+
require.NotNil(t, vtgateVars)
202+
require.NotNil(t, vtgateVars["QueryRoutes"])
203+
require.NotNil(t, vtgateVars["VtgateApiErrorCounts"])
204+
require.EqualValues(t, map[string]interface{}{
205+
"DDL.DirectDDL.PRIMARY": float64(1),
206+
"INSERT.Passthrough.PRIMARY": float64(1),
207+
}, vtgateVars["QueryRoutes"])
208+
require.EqualValues(t, map[string]interface{}{
209+
"Execute.ks.primary.DEADLINE_EXCEEDED": float64(1),
210+
}, vtgateVars["VtgateApiErrorCounts"])
211+
}()
212+
213+
wg.Add(1)
214+
waitReplicasTimeout := time.Second * 10
215+
go func() {
216+
defer wg.Done()
217+
218+
// Ensure the write (other goroutine above) is blocked waiting on ACKs on the primary.
219+
utils.WaitForQueryWithStateInProcesslist(context.Background(), t, tablets[0], writeSQL, "Waiting for semi-sync ACK from replica", time.Second*20)
220+
221+
// Send SIGSTOP to primary to simulate it being unresponsive.
222+
tablets[0].VttabletProcess.Stop()
223+
224+
// Run forced reparent operation, this should now proceed unimpeded.
225+
out, err := utils.Ers(clusterInstance, tablets[1], "15s", waitReplicasTimeout.String())
226+
require.NoError(t, err, out)
227+
}()
228+
229+
wg.Wait()
230+
231+
// We need to wait at least 10 seconds here to ensure the wait-for-replicas-timeout has passed,
232+
// before we resume the old primary - otherwise the old primary will receive a `SetReplicationSource` call.
233+
time.Sleep(waitReplicasTimeout * 2)
234+
235+
// Bring back the demoted primary
236+
tablets[0].VttabletProcess.Resume()
237+
238+
// Give the old primary some time to realize it's no longer the primary,
239+
// and for a new primary to be promoted.
240+
require.EventuallyWithT(t, func(c *assert.CollectT) {
241+
// Ensure the old primary was demoted correctly
242+
tabletInfo, err := clusterInstance.VtctldClientProcess.GetTablet(tablets[0].Alias)
243+
require.NoError(c, err)
244+
245+
// The old primary should have noticed there's a new primary tablet now and should
246+
// have demoted itself to REPLICA.
247+
require.Equal(c, topodatapb.TabletType_REPLICA, tabletInfo.GetType())
248+
249+
// The old primary should be in not serving mode because we should be unable to re-attach it
250+
// as a replica due to the errant GTID caused by semi-sync writes that were never replicated out.
251+
//
252+
// Note: The writes that were not replicated were caused by the semi sync unblocker, which
253+
// performed writes after ERS.
254+
require.Equal(c, "NOT_SERVING", tablets[0].VttabletProcess.GetTabletStatus())
255+
require.Equal(c, "replica", tablets[0].VttabletProcess.GetTabletType())
256+
257+
// Check the 2nd tablet becomes PRIMARY.
258+
require.Equal(c, "SERVING", tablets[1].VttabletProcess.GetTabletStatus())
259+
require.Equal(c, "primary", tablets[1].VttabletProcess.GetTabletType())
260+
}, 30*time.Second, time.Second, "could not validate primary was demoted")
261+
}
262+
134263
func TestReparentNoChoiceDownPrimary(t *testing.T) {
135264
clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
136265
defer utils.TeardownCluster(clusterInstance)

go/test/endtoend/reparent/utils/utils.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,15 @@ import (
3232
"github.com/stretchr/testify/assert"
3333
"github.com/stretchr/testify/require"
3434

35-
querypb "vitess.io/vitess/go/vt/proto/query"
36-
"vitess.io/vitess/go/vt/utils"
37-
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
38-
"vitess.io/vitess/go/vt/vttablet/tabletconn"
39-
4035
"vitess.io/vitess/go/mysql"
4136
"vitess.io/vitess/go/sqltypes"
4237
"vitess.io/vitess/go/test/endtoend/cluster"
4338
"vitess.io/vitess/go/vt/log"
39+
querypb "vitess.io/vitess/go/vt/proto/query"
4440
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
41+
"vitess.io/vitess/go/vt/utils"
42+
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
43+
"vitess.io/vitess/go/vt/vttablet/tabletconn"
4544
)
4645

4746
var (
@@ -843,6 +842,7 @@ func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.V
843842
}
844843
}
845844

845+
// WaitForTabletToBeServing waits for a tablet to reach a serving state.
846846
func WaitForTabletToBeServing(ctx context.Context, t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, timeout time.Duration) {
847847
vTablet, err := clusterInstance.VtctldClientProcess.GetTablet(tablet.Alias)
848848
require.NoError(t, err)
@@ -863,3 +863,22 @@ func WaitForTabletToBeServing(ctx context.Context, t *testing.T, clusterInstance
863863
t.Fatal(err.Error())
864864
}
865865
}
866+
867+
// WaitForQueryWithStateInProcesslist waits for a query to be present in the processlist with a specific state.
868+
func WaitForQueryWithStateInProcesslist(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, sql, state string, timeout time.Duration) {
869+
require.Eventually(t, func() bool {
870+
qr := RunSQL(ctx, t, "select Command, State, Info from information_schema.processlist", tablet)
871+
for _, row := range qr.Rows {
872+
if len(row) != 3 {
873+
continue
874+
}
875+
if strings.EqualFold(row[0].ToString(), "Query") {
876+
continue
877+
}
878+
if strings.EqualFold(row[1].ToString(), state) && strings.EqualFold(row[2].ToString(), sql) {
879+
return true
880+
}
881+
}
882+
return false
883+
}, timeout, time.Second, "query with state not in processlist")
884+
}

go/vt/mysqlctl/fakemysqldaemon.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,11 @@ func (fmd *FakeMysqlDaemon) SemiSyncReplicationStatus(ctx context.Context) (bool
769769
return fmd.SemiSyncReplicaEnabled, nil
770770
}
771771

772+
// IsSemiSyncBlocked is part of the MysqlDaemon interface.
773+
func (fmd *FakeMysqlDaemon) IsSemiSyncBlocked(ctx context.Context) (bool, error) {
774+
return false, nil
775+
}
776+
772777
// GetVersionString is part of the MysqlDaemon interface.
773778
func (fmd *FakeMysqlDaemon) GetVersionString(ctx context.Context) (string, error) {
774779
return fmd.Version, nil

go/vt/mysqlctl/mysql_daemon.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ type MysqlDaemon interface {
7474
SemiSyncClients(ctx context.Context) (count uint32)
7575
SemiSyncSettings(ctx context.Context) (timeout uint64, numReplicas uint32)
7676
SemiSyncReplicationStatus(ctx context.Context) (bool, error)
77+
IsSemiSyncBlocked(ctx context.Context) (bool, error)
7778
ResetReplicationParameters(ctx context.Context) error
7879
GetBinlogInformation(ctx context.Context) (binlogFormat string, logEnabled bool, logReplicaUpdate bool, binlogRowImage string, err error)
7980
GetGTIDMode(ctx context.Context) (gtidMode string, err error)

go/vt/mysqlctl/replication.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -819,3 +819,30 @@ func (mysqld *Mysqld) SemiSyncExtensionLoaded(ctx context.Context) (mysql.SemiSy
819819

820820
return conn.Conn.SemiSyncExtensionLoaded()
821821
}
822+
823+
func (mysqld *Mysqld) IsSemiSyncBlocked(ctx context.Context) (bool, error) {
824+
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
825+
if err != nil {
826+
return false, err
827+
}
828+
defer conn.Recycle()
829+
830+
// Execute the query to check if the primary is blocked on semi-sync.
831+
semiSyncWaitSessionsRead := "select variable_value from performance_schema.global_status where regexp_like(variable_name, 'Rpl_semi_sync_(source|master)_wait_sessions')"
832+
res, err := conn.Conn.ExecuteFetch(semiSyncWaitSessionsRead, 1, false)
833+
if err != nil {
834+
return false, err
835+
}
836+
// If we have no rows, then the primary doesn't have semi-sync enabled.
837+
// It then follows, that the primary isn't blocked :)
838+
if len(res.Rows) == 0 {
839+
return false, nil
840+
}
841+
842+
// Read the status value and check if it is non-zero.
843+
if len(res.Rows) != 1 || len(res.Rows[0]) != 1 {
844+
return false, fmt.Errorf("unexpected number of rows received - %v", res.Rows)
845+
}
846+
value, err := res.Rows[0][0].ToCastInt64()
847+
return value != 0, err
848+
}

go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go

Lines changed: 34 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/vt/vtcombo/tablet_map.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1017,7 +1017,7 @@ func (itmc *internalTabletManagerClient) ReadReparentJournalInfo(ctx context.Con
10171017
return 0, errors.New("not implemented in vtcombo")
10181018
}
10191019

1020-
func (itmc *internalTabletManagerClient) DemotePrimary(context.Context, *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) {
1020+
func (itmc *internalTabletManagerClient) DemotePrimary(context.Context, *topodatapb.Tablet, bool) (*replicationdatapb.PrimaryStatus, error) {
10211021
return nil, errors.New("not implemented in vtcombo")
10221022
}
10231023

go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ func (fake *TabletManagerClient) ChangeType(ctx context.Context, tablet *topodat
554554
}
555555

556556
// DemotePrimary is part of the tmclient.TabletManagerClient interface.
557-
func (fake *TabletManagerClient) DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) {
557+
func (fake *TabletManagerClient) DemotePrimary(ctx context.Context, tablet *topodatapb.Tablet, force bool) (*replicationdatapb.PrimaryStatus, error) {
558558
if fake.DemotePrimaryResults == nil {
559559
return nil, assert.AnError
560560
}

0 commit comments

Comments
 (0)