Skip to content

Commit d550b36

Browse files
[BugFix] Make TaskRun's status be compatible with lower version (backport #60438) (backport #60440) (#60447)
Signed-off-by: shuming.li <ming.moriarty@gmail.com> Co-authored-by: shuming.li <ming.moriarty@gmail.com>
1 parent 48bb46f commit d550b36

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,13 @@ public void saveTasksV2(ImageWriter imageWriter) throws IOException, SRMetaBlock
592592

593593
writer.writeInt(runStatusList.size());
594594
for (TaskRunStatus status : runStatusList) {
595+
// TODO: This is to be compatible with the old version of TaskRunStatus when degraded from a higher version
596+
// because SKIPPED state is not defined in the lower version.
597+
// NOTE: This can be removed in the next 3.4 version release.
598+
if (status.getState() != null && status.getState().equals(Constants.TaskRunState.SKIPPED)) {
599+
status.setState(Constants.TaskRunState.SUCCESS);
600+
LOG.warn("TaskRunStatus state is SKIPPED, change to SUCCESS, status: {}", status);
601+
}
595602
writer.writeJson(status);
596603
}
597604

@@ -683,6 +690,22 @@ private boolean isSameTaskRunJob(TaskRunStatus taskRunStatus,
683690
}
684691

685692
public void replayCreateTaskRun(TaskRunStatus status) {
693+
try {
694+
doReplayCreateTaskRun(status);
695+
} catch (Exception e) {
696+
LOG.warn("replay create task run failed, status: {}, error: {}", status, e.getMessage());
697+
// The task run will be replayed in FE restart, If the replay fails, it will cause FE restart failed.
698+
// It's fine to discard the task run since it's only task's history records and can be retried later.
699+
}
700+
}
701+
702+
private void doReplayCreateTaskRun(TaskRunStatus status) {
703+
// NOTE: If current FE is downgraded from a higher version and TaskRunStatus#State is new added which is not defined
704+
// in current version, status.getState() will be null.
705+
if (status == null || status.getState() == null || Strings.isNullOrEmpty(status.getTaskName())) {
706+
LOG.warn("replayCreateTaskRun: status is null or taskId is invalid, status: {}", status);
707+
return;
708+
}
686709
if (status.getState().isFinishState() && System.currentTimeMillis() > status.getExpireTime()) {
687710
return;
688711
}

fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
import com.starrocks.common.util.ThreadUtil;
2727
import com.starrocks.common.util.TimeUtils;
2828
import com.starrocks.common.util.UUIDUtil;
29+
import com.starrocks.persist.ImageWriter;
30+
import com.starrocks.persist.metablock.SRMetaBlockReader;
2931
import com.starrocks.qe.ConnectContext;
32+
import com.starrocks.scheduler.history.TaskRunHistory;
3033
import com.starrocks.scheduler.persist.TaskRunStatus;
3134
import com.starrocks.scheduler.persist.TaskRunStatusChange;
3235
import com.starrocks.server.GlobalStateMgr;
@@ -886,4 +889,49 @@ public void testTaskRunWithLargeDefinition3() {
886889
String definition = taskRun1.getStatus().getDefinition();
887890
Assert.assertTrue(definition == null);
888891
}
892+
893+
@Test
894+
public void saveTasksV2SkipsSkippedTaskRunStatuses() throws Exception {
895+
UtFrameUtils.PseudoImage image = new UtFrameUtils.PseudoImage();
896+
{
897+
TaskManager taskManager = new TaskManager();
898+
ImageWriter imageWriter = image.getImageWriter();
899+
900+
Task task = new Task("task");
901+
task.setId(1L);
902+
taskManager.replayCreateTask(task);
903+
904+
TaskRunStatus skippedStatus = new TaskRunStatus();
905+
skippedStatus.setTaskId(1);
906+
skippedStatus.setQueryId("task_run_1");
907+
skippedStatus.setTaskName("task_run_1");
908+
skippedStatus.setState(Constants.TaskRunState.SKIPPED);
909+
skippedStatus.setExpireTime(System.currentTimeMillis() + 1000000);
910+
taskManager.replayCreateTaskRun(skippedStatus);
911+
912+
TaskRunStatus validStatus = new TaskRunStatus();
913+
validStatus.setTaskId(2);
914+
validStatus.setQueryId("task_run_2");
915+
validStatus.setTaskName("task_run_2");
916+
validStatus.setState(Constants.TaskRunState.SUCCESS);
917+
validStatus.setExpireTime(System.currentTimeMillis() + 1000000);
918+
taskManager.replayCreateTaskRun(validStatus);
919+
920+
TaskRunHistory taskRunHistory = taskManager.getTaskRunHistory();
921+
Assert.assertEquals(2, taskRunHistory.getTaskRunCount());
922+
923+
taskManager.saveTasksV2(imageWriter);
924+
}
925+
926+
SRMetaBlockReader imageReader = image.getMetaBlockReader();
927+
{
928+
TaskManager taskManager = new TaskManager();
929+
taskManager.loadTasksV2(imageReader);
930+
TaskRunHistory taskRunHistory = taskManager.getTaskRunHistory();
931+
Assert.assertEquals(2, taskRunHistory.getTaskRunCount());
932+
taskRunHistory.getInMemoryHistory()
933+
.stream()
934+
.forEach(status -> Assert.assertEquals(status.getState(), Constants.TaskRunState.SUCCESS));
935+
}
936+
}
889937
}

0 commit comments

Comments
 (0)