Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/alter/OptimizeJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.qe.SessionVariable;
import com.starrocks.scheduler.Constants;
import com.starrocks.scheduler.SubmitResult;
import com.starrocks.scheduler.TaskBuilder;
import com.starrocks.scheduler.TaskManager;
import com.starrocks.scheduler.TaskRun;
Expand Down Expand Up @@ -289,7 +290,12 @@ protected void runWaitingTxnJob() throws AlterCancelException {
for (OptimizeTask rewriteTask : rewriteTasks) {
try {
taskManager.createTask(rewriteTask, false);
taskManager.executeTask(rewriteTask.getName());
SubmitResult r = taskManager.executeTask(rewriteTask.getName());
if (r.getStatus() == SubmitResult.SubmitStatus.SUBMITTED) {
rewriteTask.setOptimizeTaskState(Constants.TaskRunState.RUNNING);
} else if (r.getStatus() == SubmitResult.SubmitStatus.FAILED) {
rewriteTask.setOptimizeTaskState(Constants.TaskRunState.FAILED);
}
LOG.debug("create rewrite task {}", rewriteTask.toString());
} catch (DdlException e) {
rewriteTask.setOptimizeTaskState(Constants.TaskRunState.FAILED);
Expand Down Expand Up @@ -332,6 +338,7 @@ protected void runRunningJob() throws AlterCancelException {
int progress = 0;
TaskRunManager taskRunManager = GlobalStateMgr.getCurrentState().getTaskManager().getTaskRunManager();
TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler();
TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager(); // add: define taskManager

// prepare for the history task info
Set<String> taskNames = Sets.newHashSet();
Expand All @@ -342,6 +349,14 @@ protected void runRunningJob() throws AlterCancelException {
.getTaskRunManager().getTaskRunHistory().lookupHistoryByTaskNames(dbName, taskNames);

for (OptimizeTask rewriteTask : rewriteTasks) {
if (rewriteTask.getOptimizeTaskState() == Constants.TaskRunState.PENDING) {
SubmitResult r = taskManager.executeTask(rewriteTask.getName());
if (r.getStatus() == SubmitResult.SubmitStatus.SUBMITTED) {
rewriteTask.setOptimizeTaskState(Constants.TaskRunState.RUNNING);
} else if (r.getStatus() == SubmitResult.SubmitStatus.FAILED) {
rewriteTask.setOptimizeTaskState(Constants.TaskRunState.FAILED);
}
}
if (rewriteTask.getOptimizeTaskState() == Constants.TaskRunState.FAILED
|| rewriteTask.getOptimizeTaskState() == Constants.TaskRunState.SUCCESS) {
progress += 100 / rewriteTasks.size();
Expand Down
125 changes: 122 additions & 3 deletions fe/fe-core/src/test/java/com/starrocks/alter/OptimizeJobV2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.starrocks.common.Config;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.scheduler.Constants;
import com.starrocks.scheduler.TaskBuilder;
import com.starrocks.scheduler.persist.TaskRunStatus;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.DDLTestBase;
Expand Down Expand Up @@ -479,9 +480,7 @@ public void testOptimizeFailedByVersion() throws Exception {

// runWaitingTxnJob
optimizeJob.runWaitingTxnJob();
if (optimizeJob.getJobState() != JobState.RUNNING) {
return;
}
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());

// runRunningJob
List<OptimizeTask> optimizeTasks = optimizeJob.getOptimizeTasks();
Expand All @@ -503,4 +502,124 @@ public void testOptimizeFailedByVersion() throws Exception {
Assert.assertEquals(JobState.CANCELLED, optimizeJob.getJobState());
}

@Test
public void testOptimizeDistributionTypeSuccess() throws Exception {
SchemaChangeHandler schemaChangeHandler = GlobalStateMgr.getCurrentState().getSchemaChangeHandler();
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(GlobalStateMgrTestUtil.testDb1);
OlapTable olapTable =
(OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), "testTable2");

String stmt = "alter table testTable2 distributed by random";
AlterTableStmt alterStmt = (AlterTableStmt) UtFrameUtils.parseStmtWithNewParser(stmt, starRocksAssert.getCtx());
schemaChangeHandler.process(alterStmt.getAlterClauseList(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = schemaChangeHandler.getAlterJobsV2();
Assertions.assertEquals(1, alterJobsV2.size());
OptimizeJobV2 optimizeJob = (OptimizeJobV2) alterJobsV2.values().stream().findAny().get();

// runPendingJob
optimizeJob.runPendingJob();
Assertions.assertEquals(JobState.WAITING_TXN, optimizeJob.getJobState());

// runWaitingTxnJob
optimizeJob.runWaitingTxnJob();
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());

// Make all tasks SUCCESS to cover allPartitionOptimized branch
List<OptimizeTask> optimizeTasks = optimizeJob.getOptimizeTasks();
// Expect 2 tasks (2 partitions in test env)
Assertions.assertEquals(2, optimizeTasks.size());
for (OptimizeTask t : optimizeTasks) {
t.setOptimizeTaskState(Constants.TaskRunState.SUCCESS);
}

try {
optimizeJob.runRunningJob();
} catch (Exception e) {
LOG.info(e.getMessage());
}

// Verify job finished and default distribution updated
Assertions.assertEquals(JobState.FINISHED, optimizeJob.getJobState());
Assertions.assertEquals(OlapTable.OlapTableState.NORMAL, olapTable.getState());
Assertions.assertEquals(
com.starrocks.catalog.DistributionInfo.DistributionInfoType.RANDOM,
olapTable.getDefaultDistributionInfo().getType()
);
}

@Test
public void testRunRunningJobSubmitPendingTasks() throws Exception {
SchemaChangeHandler schemaChangeHandler = GlobalStateMgr.getCurrentState().getSchemaChangeHandler();
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(GlobalStateMgrTestUtil.testDb1);
OlapTable olapTable = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore()
.getTable(db.getFullName(), GlobalStateMgrTestUtil.testTable7);

// Drive job to PENDING -> WAITING_TXN -> RUNNING
schemaChangeHandler.process(alterTableStmt.getAlterClauseList(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = schemaChangeHandler.getAlterJobsV2();
Assertions.assertEquals(1, alterJobsV2.size());
OptimizeJobV2 optimizeJob = (OptimizeJobV2) alterJobsV2.values().stream().findAny().get();

optimizeJob.runPendingJob();
Assertions.assertEquals(JobState.WAITING_TXN, optimizeJob.getJobState());

optimizeJob.runWaitingTxnJob();
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());

// Set all tasks to PENDING and clear scheduler state to trigger executeTask path in runRunningJob
List<OptimizeTask> optimizeTasks = optimizeJob.getOptimizeTasks();
for (OptimizeTask t : optimizeTasks) {
t.setOptimizeTaskState(Constants.TaskRunState.PENDING);
GlobalStateMgr.getCurrentState().getTaskManager().getTaskRunManager()
.getTaskRunScheduler().removeRunningTask(t.getId());
GlobalStateMgr.getCurrentState().getTaskManager().getTaskRunManager()
.getTaskRunScheduler().removePendingTask(t);
}

// Trigger path: executeTask for PENDING tasks should set state to RUNNING or FAILED
optimizeJob.runRunningJob();

// Assert: all tasks should not be PENDING
for (OptimizeTask t : optimizeTasks) {
Assertions.assertNotEquals(Constants.TaskRunState.PENDING, t.getOptimizeTaskState());
}
// Job should remain RUNNING because tasks are not finished
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());
}

@Test
public void testRunRunningJobSubmitPendingTasksFailed() throws Exception {
SchemaChangeHandler schemaChangeHandler = GlobalStateMgr.getCurrentState().getSchemaChangeHandler();
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(GlobalStateMgrTestUtil.testDb1);
OlapTable olapTable = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore()
.getTable(db.getFullName(), GlobalStateMgrTestUtil.testTable7);

// Drive job to PENDING -> WAITING_TXN -> RUNNING
schemaChangeHandler.process(alterTableStmt.getAlterClauseList(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = schemaChangeHandler.getAlterJobsV2();
Assertions.assertEquals(1, alterJobsV2.size());
OptimizeJobV2 optimizeJob = (OptimizeJobV2) alterJobsV2.values().stream().findAny().get();

optimizeJob.runPendingJob();
Assertions.assertEquals(JobState.WAITING_TXN, optimizeJob.getJobState());

optimizeJob.runWaitingTxnJob();
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());

// Create a fake PENDING task that is not registered in TaskManager to force executeTask -> FAILED
String fakeTaskName = optimizeJob.getName() + "_fake_pending";
OptimizeTask fakeTask = TaskBuilder.buildOptimizeTask(fakeTaskName, optimizeJob.getProperties(),
"select 1", db.getFullName(), 0L);
fakeTask.setOptimizeTaskState(Constants.TaskRunState.PENDING);
optimizeJob.getOptimizeTasks().add(fakeTask);

// Trigger runRunningJob: PENDING task should try to execute and become FAILED
optimizeJob.runRunningJob();

// Verify the fake task failed due to executeTask returning FAILED
Assertions.assertEquals(Constants.TaskRunState.FAILED, fakeTask.getOptimizeTaskState());
// Job should remain RUNNING because other tasks are not finished
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());
}

}
Loading