Skip to content

Commit 601ccea

Browse files
committed
fix: Resharding bug
--story=1
1 parent 700e2a8 commit 601ccea

File tree

2 files changed

+39
-22
lines changed

2 files changed

+39
-22
lines changed

shardingdb.go

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -263,27 +263,45 @@ func (sdb *ShardingDb) Infof(msg string, a ...interface{}) {
263263
func (sdb *ShardingDb) Resharding() error {
264264
sdb.lock.Lock()
265265
defer sdb.lock.Unlock()
266-
for i, dbHandle := range sdb.dbHandles {
267-
iter := dbHandle.NewIterator(nil, nil)
268-
sdb.Infof("Resharding db[%d]", i)
269-
for iter.Next() {
270-
key := iter.Key()
271-
value := iter.Value()
272-
dbIndex := sdb.shardingFunc(key, sdb.length)
273-
if dbIndex != uint16(i) {
274-
sdb.Debugf("Move kv from db[%d] to db[%d]", i, dbIndex)
275-
if err := sdb.dbHandles[dbIndex].Put(key, value, nil); err != nil {
276-
iter.Release()
277-
return err
278-
}
279-
if err := dbHandle.Delete(key, nil); err != nil {
280-
iter.Release()
281-
return err
266+
//get all snapshots
267+
snapshots := make([]Snapshot, sdb.length)
268+
for idx, dbHandle := range sdb.dbHandles {
269+
snapshot, err := dbHandle.GetSnapshot()
270+
if err != nil {
271+
return err
272+
}
273+
snapshots[idx] = snapshot
274+
}
275+
wg := sync.WaitGroup{}
276+
for x, snapshot := range snapshots {
277+
wg.Add(1)
278+
//concurrent resharding
279+
go func(index int, dbReader Snapshot) {
280+
defer wg.Done()
281+
iter := dbReader.NewIterator(nil, nil)
282+
sdb.Infof("Resharding db[%d]", index)
283+
for iter.Next() {
284+
key := iter.Key()
285+
value := iter.Value()
286+
dbIndex := sdb.shardingFunc(key, sdb.length)
287+
if dbIndex != uint16(index) {
288+
sdb.Debugf("Move kv from db[%d] to db[%d]", index, dbIndex)
289+
if err := sdb.dbHandles[dbIndex].Put(key, value, nil); err != nil {
290+
iter.Release()
291+
panic(err)
292+
}
293+
//delete data from old db
294+
if err := sdb.dbHandles[index].Delete(key, nil); err != nil {
295+
iter.Release()
296+
panic(err)
297+
}
282298
}
283299
}
284-
}
285-
sdb.Infof("Resharding db[%d] finished", i)
286-
iter.Release()
300+
sdb.Infof("Resharding db[%d] finished", index)
301+
iter.Release()
302+
dbReader.Release()
303+
}(x, snapshot)
287304
}
305+
wg.Wait()
288306
return nil
289307
}

shardingdb_main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,8 @@ func Migration(dbReaders []LevelDbHandle, sdb *ShardingDb) error {
7373
wg := sync.WaitGroup{}
7474
for i, dbHandle := range dbReaders {
7575
wg.Add(1)
76-
dbReader := dbHandle
7776
//concurrent resharding
78-
go func(index int) {
77+
go func(index int, dbReader LevelDbHandle) {
7978
defer wg.Done()
8079
iter := dbReader.NewIterator(nil, nil)
8180
sdb.Infof("Resharding db[%d]", index)
@@ -92,7 +91,7 @@ func Migration(dbReaders []LevelDbHandle, sdb *ShardingDb) error {
9291
}
9392
iter.Release()
9493
sdb.Infof("Resharding db[%d] finished", index)
95-
}(i)
94+
}(i, dbHandle)
9695
}
9796
wg.Wait()
9897
return nil

0 commit comments

Comments
 (0)