Skip to content

Commit b9a6c9b

Browse files
[release-22.0] Topo: Add NamedLock test for zk2 and consul and get them passing (#18407) (#18410)
Signed-off-by: Matt Lord <[email protected]> Signed-off-by: Harshit Gangal <[email protected]> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Harshit Gangal <[email protected]>
1 parent 404ed04 commit b9a6c9b

File tree

6 files changed

+106
-38
lines changed

6 files changed

+106
-38
lines changed

go/test/endtoend/topotest/consul/main_test.go

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,15 @@ import (
2424
"testing"
2525
"time"
2626

27-
topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
28-
"vitess.io/vitess/go/vt/log"
29-
"vitess.io/vitess/go/vt/topo"
30-
3127
"github.com/google/go-cmp/cmp"
3228
"github.com/stretchr/testify/require"
3329

3430
"vitess.io/vitess/go/mysql"
3531
"vitess.io/vitess/go/sqltypes"
3632
"vitess.io/vitess/go/test/endtoend/cluster"
33+
topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
34+
"vitess.io/vitess/go/vt/log"
35+
"vitess.io/vitess/go/vt/topo"
3736
)
3837

3938
var (
@@ -221,6 +220,53 @@ func TestKeyspaceLocking(t *testing.T) {
221220
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
222221
}
223222

223+
// TestNamedLocking tests that named locking works as intended.
224+
func TestNamedLocking(t *testing.T) {
225+
// Create topo server connection.
226+
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
227+
require.NoError(t, err)
228+
229+
ctx := context.Background()
230+
lockName := "TestNamedLocking"
231+
action := "Testing"
232+
233+
// Acquire a named lock.
234+
ctx, unlock, err := ts.LockName(ctx, lockName, action)
235+
require.NoError(t, err)
236+
237+
// Check that we can't reacquire it from the same context.
238+
_, _, err = ts.LockName(ctx, lockName, action)
239+
require.ErrorContains(t, err, fmt.Sprintf("lock for named %s is already held", lockName))
240+
241+
// Check that CheckNameLocked doesn't return an error as we should still be
242+
// holding the lock.
243+
err = topo.CheckNameLocked(ctx, lockName)
244+
require.NoError(t, err)
245+
246+
// We'll now try to acquire the lock from a different goroutine.
247+
secondCallerAcquired := false
248+
go func() {
249+
_, unlock, err := ts.LockName(context.Background(), lockName, action)
250+
defer unlock(&err)
251+
require.NoError(t, err)
252+
secondCallerAcquired = true
253+
}()
254+
255+
// Wait for some time and ensure that the second attempt at acquiring the lock
256+
// is blocked.
257+
time.Sleep(100 * time.Millisecond)
258+
require.False(t, secondCallerAcquired)
259+
260+
// Unlock the name.
261+
unlock(&err)
262+
// Check that we no longer have the named lock.
263+
err = topo.CheckNameLocked(ctx, lockName)
264+
require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName))
265+
266+
// Wait to see that the second goroutine WAS now able to acquire the named lock.
267+
topoutils.WaitForBoolValue(t, &secondCallerAcquired, true)
268+
}
269+
224270
func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
225271
t.Helper()
226272
qr, err := conn.ExecuteFetch(query, 1000, true)

go/test/endtoend/topotest/zk2/main_test.go

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,20 @@ package zk2
1919
import (
2020
"context"
2121
"flag"
22+
"fmt"
2223
"os"
2324
"testing"
2425
"time"
2526

26-
topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
27-
"vitess.io/vitess/go/test/endtoend/utils"
28-
"vitess.io/vitess/go/vt/topo"
29-
30-
"vitess.io/vitess/go/vt/log"
31-
3227
"github.com/stretchr/testify/require"
3328

3429
"vitess.io/vitess/go/mysql"
3530
"vitess.io/vitess/go/sqltypes"
3631
"vitess.io/vitess/go/test/endtoend/cluster"
32+
topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
33+
"vitess.io/vitess/go/test/endtoend/utils"
34+
"vitess.io/vitess/go/vt/log"
35+
"vitess.io/vitess/go/vt/topo"
3736
)
3837

3938
var (
@@ -97,6 +96,53 @@ func TestMain(m *testing.M) {
9796
os.Exit(exitCode)
9897
}
9998

99+
// TestNamedLocking tests that named locking works as intended.
100+
func TestNamedLocking(t *testing.T) {
101+
// Create topo server connection.
102+
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
103+
require.NoError(t, err)
104+
105+
ctx := context.Background()
106+
lockName := "TestNamedLocking"
107+
action := "Testing"
108+
109+
// Acquire a named lock.
110+
ctx, unlock, err := ts.LockName(ctx, lockName, action)
111+
require.NoError(t, err)
112+
113+
// Check that we can't reacquire it from the same context.
114+
_, _, err = ts.LockName(ctx, lockName, action)
115+
require.ErrorContains(t, err, fmt.Sprintf("lock for named %s is already held", lockName))
116+
117+
// Check that CheckNameLocked doesn't return an error as we should still be
118+
// holding the lock.
119+
err = topo.CheckNameLocked(ctx, lockName)
120+
require.NoError(t, err)
121+
122+
// We'll now try to acquire the lock from a different goroutine.
123+
secondCallerAcquired := false
124+
go func() {
125+
_, unlock, err := ts.LockName(context.Background(), lockName, action)
126+
defer unlock(&err)
127+
require.NoError(t, err)
128+
secondCallerAcquired = true
129+
}()
130+
131+
// Wait for some time and ensure that the second attempt at acquiring the lock
132+
// is blocked.
133+
time.Sleep(100 * time.Millisecond)
134+
require.False(t, secondCallerAcquired)
135+
136+
// Unlock the name.
137+
unlock(&err)
138+
// Check that we no longer have the named lock.
139+
err = topo.CheckNameLocked(ctx, lockName)
140+
require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName))
141+
142+
// Wait to see that the second goroutine WAS now able to acquire the named lock.
143+
topoutils.WaitForBoolValue(t, &secondCallerAcquired, true)
144+
}
145+
100146
func TestTopoDownServingQuery(t *testing.T) {
101147
ctx := context.Background()
102148
vtParams := mysql.ConnParams{

go/vt/topo/test/lock.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,6 @@ func checkLock(t *testing.T, ctx context.Context, ts *topo.Server) {
4747
t.Log("=== checkLockTimeout")
4848
checkLockTimeout(ctx, t, conn)
4949

50-
t.Log("=== checkLockMissing")
51-
checkLockMissing(ctx, t, conn)
52-
5350
t.Log("=== checkLockUnblocks")
5451
checkLockUnblocks(ctx, t, conn)
5552
}
@@ -121,14 +118,6 @@ func checkLockTimeout(ctx context.Context, t *testing.T, conn topo.Conn) {
121118
}
122119
}
123120

124-
// checkLockMissing makes sure we can't lock a non-existing directory.
125-
func checkLockMissing(ctx context.Context, t *testing.T, conn topo.Conn) {
126-
keyspacePath := path.Join(topo.KeyspacesPath, "test_keyspace_666")
127-
if _, err := conn.Lock(ctx, keyspacePath, "missing"); err == nil {
128-
t.Fatalf("Lock(test_keyspace_666) worked for non-existing keyspace")
129-
}
130-
}
131-
132121
// checkLockUnblocks makes sure that a routine waiting on a lock
133122
// is unblocked when another routine frees the lock
134123
func checkLockUnblocks(ctx context.Context, t *testing.T, conn topo.Conn) {

go/vt/topo/test/trylock.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,6 @@ func checkTryLock(t *testing.T, ctx context.Context, ts *topo.Server) {
4444
t.Log("=== checkTryLockTimeout")
4545
checkTryLockTimeout(ctx, t, conn)
4646

47-
t.Log("=== checkTryLockMissing")
48-
checkTryLockMissing(ctx, t, conn)
49-
5047
t.Log("=== checkTryLockUnblocks")
5148
checkTryLockUnblocks(ctx, t, conn)
5249
}
@@ -142,14 +139,6 @@ func checkTryLockTimeout(ctx context.Context, t *testing.T, conn topo.Conn) {
142139
}
143140
}
144141

145-
// checkTryLockMissing makes sure we can't lock a non-existing directory.
146-
func checkTryLockMissing(ctx context.Context, t *testing.T, conn topo.Conn) {
147-
keyspacePath := path.Join(topo.KeyspacesPath, "test_keyspace_666")
148-
if _, err := conn.TryLock(ctx, keyspacePath, "missing"); err == nil {
149-
require.Fail(t, "TryLock(test_keyspace_666) worked for non-existing keyspace")
150-
}
151-
}
152-
153142
// unlike 'checkLockUnblocks', checkTryLockUnblocks will not block on other client but instead
154143
// keep retrying until it gets the lock.
155144
func checkTryLockUnblocks(ctx context.Context, t *testing.T, conn topo.Conn) {

go/vt/topo/zk2topo/lock.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ func (zs *Server) lock(ctx context.Context, dirPath, contents string) (topo.Lock
8888
// sequential nodes, they are created as children, not siblings.
8989
locksDir := path.Join(zs.root, dirPath, locksPath) + "/"
9090

91-
// Create the locks path, possibly creating the parent.
92-
nodePath, err := CreateRecursive(ctx, zs.conn, locksDir, []byte(contents), zk.FlagSequence|zk.FlagEphemeral, zk.WorldACL(PermFile), 1)
91+
// Create the lock path, creating the parents as needed.
92+
nodePath, err := CreateRecursive(ctx, zs.conn, locksDir, []byte(contents), zk.FlagSequence|zk.FlagEphemeral, zk.WorldACL(PermFile), -1)
9393
if err != nil {
9494
return nil, convertError(err, locksDir)
9595
}

go/vt/topo/zk2topo/utils.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,18 @@ limitations under the License.
1717
package zk2topo
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"path"
2223
"sort"
2324
"strings"
2425
"sync"
2526

26-
"context"
27-
2827
"github.com/z-division/go-zookeeper/zk"
2928

29+
"vitess.io/vitess/go/fileutil"
3030
"vitess.io/vitess/go/vt/log"
3131
"vitess.io/vitess/go/vt/vterrors"
32-
33-
"vitess.io/vitess/go/fileutil"
3432
)
3533

3634
// CreateRecursive is a helper function on top of Create. It will

0 commit comments

Comments
 (0)