Skip to content

Commit 7c0bd61

Browse files
mergify[bot]banmoy
andauthored
[BugFix] Fix schema change publish does not retry in shared-data (backport #64093) (#64204)
Signed-off-by: PengFei Li <[email protected]> Co-authored-by: PengFei Li <[email protected]>
1 parent b0a90fd commit 7c0bd61

File tree

2 files changed

+73
-1
lines changed

2 files changed

+73
-1
lines changed

fe/fe-core/src/main/java/com/starrocks/alter/AlterJobV2.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,13 +348,16 @@ protected boolean publishVersion() {
348348
return lakePublishVersion();
349349
};
350350
publishVersionFuture = GlobalStateMgr.getCurrentState().getLakeAlterPublishExecutor().submit(task);
351+
LOG.info("submit publish task for job: {}", jobId);
351352
return false;
352353
} else {
353354
if (publishVersionFuture.isDone()) {
354355
try {
355356
return publishVersionFuture.get();
356357
} catch (InterruptedException | ExecutionException e) {
357358
return false;
359+
} finally {
360+
publishVersionFuture = null;
358361
}
359362
} else {
360363
return false;

fe/fe-core/src/test/java/com/starrocks/alter/LakeTableSchemaChangeJobTest.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,12 @@
4646
import org.junit.jupiter.api.Test;
4747

4848
import java.io.IOException;
49+
import java.io.PrintWriter;
50+
import java.io.StringWriter;
4951
import java.util.Collection;
5052
import java.util.List;
5153
import java.util.concurrent.atomic.AtomicBoolean;
54+
import java.util.concurrent.atomic.AtomicInteger;
5255

5356
public class LakeTableSchemaChangeJobTest {
5457
private static final int NUM_BUCKETS = 4;
@@ -65,6 +68,16 @@ public LakeTableSchemaChangeJobTest() {
6568
public static void setUp() throws Exception {
6669
UtFrameUtils.createMinStarRocksCluster(RunMode.SHARED_DATA);
6770
connectContext = UtFrameUtils.createDefaultCtx();
71+
// Schema change job is executed manually, so stop the schema change daemon to avoid interfering with the test.
72+
SchemaChangeHandler schemaChangeHandler = GlobalStateMgr.getCurrentState().getAlterJobMgr().getSchemaChangeHandler();
73+
schemaChangeHandler.setStop();
74+
schemaChangeHandler.interrupt();
75+
long startTime = System.currentTimeMillis();
76+
while (schemaChangeHandler.isRunning()) {
77+
Thread.sleep(100);
78+
Assertions.assertTrue(System.currentTimeMillis() - startTime < 60000,
79+
"Schema change handler is not stopped in 60 seconds");
80+
}
6881
}
6982

7083
private static LakeTable createTable(ConnectContext connectContext, String sql) throws Exception {
@@ -489,6 +502,56 @@ public boolean publishVersion() {
489502
Assertions.assertEquals(AlterJobV2.JobState.FINISHED, schemaChangeJob.getJobState());
490503
}
491504

505+
@Test
506+
public void testPublishRetry() throws Exception {
507+
AtomicInteger numPublishRetry = new AtomicInteger(0);
508+
new MockUp<LakeTableSchemaChangeJob>() {
509+
@Mock
510+
public boolean lakePublishVersion() {
511+
numPublishRetry.incrementAndGet();
512+
return false;
513+
}
514+
};
515+
516+
new MockUp<LakeTableSchemaChangeJob>() {
517+
@Mock
518+
public void sendAgentTask(AgentBatchTask batchTask) {
519+
batchTask.getAllTasks().forEach(t -> t.setFinished(true));
520+
}
521+
};
522+
523+
schemaChangeJob.run();
524+
Assertions.assertEquals(AlterJobV2.JobState.FINISHED_REWRITING, schemaChangeJob.getJobState());
525+
long timeoutMs = 10 * 60 * 1000L;
526+
long deadline = System.currentTimeMillis() + timeoutMs;
527+
while (numPublishRetry.get() < 2) {
528+
schemaChangeJob.run();
529+
Thread.sleep(100);
530+
Assertions.assertTrue(System.currentTimeMillis() < deadline,
531+
() -> String.format("publish does not retry before timeout, job state: %s", schemaChangeJob.getJobState()));
532+
}
533+
534+
new MockUp<LakeTableSchemaChangeJob>() {
535+
@Mock
536+
public boolean lakePublishVersion() {
537+
return true;
538+
}
539+
};
540+
541+
deadline = System.currentTimeMillis() + timeoutMs;
542+
while (schemaChangeJob.getJobState() != AlterJobV2.JobState.FINISHED
543+
|| table.getState() != OlapTable.OlapTableState.NORMAL) {
544+
schemaChangeJob.run();
545+
Thread.sleep(100);
546+
Assertions.assertTrue(System.currentTimeMillis() < deadline,
547+
() -> String.format("publish does not finish before timeout, job state: %s, table state: %s",
548+
schemaChangeJob.getJobState(), table.getState()));
549+
}
550+
Assertions.assertEquals(2, table.getBaseSchema().size());
551+
Assertions.assertEquals("c0", table.getBaseSchema().get(0).getName());
552+
Assertions.assertEquals("c1", table.getBaseSchema().get(1).getName());
553+
}
554+
492555
@Test
493556
public void testTransactionRaceCondition() throws AlterCancelException {
494557
new MockUp<LakeTableSchemaChangeJob>() {
@@ -516,7 +579,13 @@ public boolean isPreviousLoadFinished(long dbId, long tableId, long txnId) throw
516579
Exception exception = Assertions.assertThrows(AlterCancelException.class, () ->
517580
schemaChangeJob.runPendingJob());
518581
Assertions.assertTrue(exception.getMessage().contains(
519-
"concurrent transaction detected while adding shadow index, please re-run the alter table command"));
582+
"concurrent transaction detected while adding shadow index, please re-run the alter table command"),
583+
() -> {
584+
StringWriter sw = new java.io.StringWriter();
585+
PrintWriter pw = new java.io.PrintWriter(sw);
586+
exception.printStackTrace(pw);
587+
return sw.toString();
588+
});
520589
Assertions.assertEquals(AlterJobV2.JobState.PENDING, schemaChangeJob.getJobState());
521590
Assertions.assertEquals(10101L, schemaChangeJob.getWatershedTxnId());
522591

0 commit comments

Comments
 (0)