Skip to content

Commit 1ce07c8

Browse files
[BugFix] Fixed phased scheduler always waiting for profile collection in sync profile collection (backport #62140) (#62193)
Signed-off-by: stdpain <drfeng08@gmail.com> Co-authored-by: stdpain <34912776+stdpain@users.noreply.github.com>
1 parent 1986515 commit 1ce07c8

File tree

4 files changed

+31
-5
lines changed

4 files changed

+31
-5
lines changed

fe/fe-core/src/main/java/com/starrocks/common/FeConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public class FeConstants {
7373
"Compute node not found. Check if any compute node is down.";
7474
public static final String QUERY_FINISHED_ERROR = "QueryFinished";
7575
public static final String LIMIT_REACH_ERROR = "LimitReach";
76+
public static final String SCHEDULE_FRAGMENT_ERROR =
77+
"Schedule Fragment error. caused by:";
7678

7779

7880
public static boolean USE_MOCK_DICT_MANAGER = false;

fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ public Status scheduleNextTurn(TUniqueId fragmentInstanceId) {
585585
scheduler.tryScheduleNextTurn(fragmentInstanceId);
586586
} catch (Exception e) {
587587
LOG.warn("schedule fragment:{} next internal error:", DebugUtil.printId(fragmentInstanceId), e);
588-
cancel(PPlanFragmentCancelReason.INTERNAL_ERROR, e.getMessage());
588+
cancel(PPlanFragmentCancelReason.INTERNAL_ERROR, FeConstants.SCHEDULE_FRAGMENT_ERROR + e.getMessage());
589589
return Status.internalError(e.getMessage());
590590
}
591591
return Status.OK;
@@ -1005,6 +1005,11 @@ public RowBatch getNext() throws Exception {
10051005
public void cancel(PPlanFragmentCancelReason reason, String message) {
10061006
lock();
10071007
try {
1008+
// All results have been obtained. The query has ended. Ignore this error.
1009+
if (returnedAllResults) {
1010+
cancelInternal(PPlanFragmentCancelReason.QUERY_FINISHED);
1011+
return;
1012+
}
10081013
if (!queryStatus.ok()) {
10091014
// we can't cancel twice
10101015
return;
@@ -1018,10 +1023,10 @@ public void cancel(PPlanFragmentCancelReason reason, String message) {
10181023
try {
10191024
// Disable count down profileDoneSignal for collect all backend's profile
10201025
// but if backend has crashed, we need count down profileDoneSignal since it will not report by itself
1021-
if (message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) {
1026+
if (message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR) ||
1027+
message.startsWith(FeConstants.SCHEDULE_FRAGMENT_ERROR)) {
10221028
queryProfile.finishAllInstances(Status.OK);
1023-
LOG.info("count down profileDoneSignal since backend has crashed, query id: {}",
1024-
DebugUtil.printId(jobSpec.getQueryId()));
1029+
10251030
}
10261031
} finally {
10271032
unlock();

test/sql/test_phased_schedule/R/test_phased_schedule

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,17 @@ with agged_table as ( select distinct c0, c1 from t0) select /*+SET_VAR(cbo_cte_
211211
with agged_table as ( select distinct c0, c1 from t0) select /*+SET_VAR(cbo_cte_reuse_rate=-1) */ count(*), sum(c0),sum(c1) from (select l.c1, l.c0 from agged_table l join t0 r on l.c0=r.c2 union all select * from agged_table) tb;
212212
-- result:
213213
81921 1677762560.0 1677762560.0
214+
-- !result
215+
set enable_profile=true;
216+
-- result:
217+
-- !result
218+
set enable_async_profile=true;
219+
-- result:
220+
-- !result
221+
set profile_timeout=300;
222+
-- result:
223+
-- !result
224+
select count(*) from small_table1 s1 join small_table2 s2 on s1.c0 = s2.c0 join small_table3 s3 on s1.c0 = s3.c0 where s3.c3 = 0;
225+
-- result:
226+
0
214227
-- !result

test/sql/test_phased_schedule/T/test_phased_schedule

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,10 @@ with agged_table as ( select distinct c0,c1 from (select l.c0, r.c1 from t0 l jo
118118
select count(*) from small_table1 s1 join small_table2 s2 on s1.c0 = s2.c0 join small_table3 s3 on s1.c0 = s3.c0 join small_table1 s4 on s1.c0 = s4.c0 join small_table2 s5 on s1.c0 = s5.c0 join small_table3 s6 on s1.c0 = s6.c0 join small_table1 s7 on s1.c0 = s7.c0 join small_table2 s8 on s1.c0 = s8.c0 join small_table3 s9 on s1.c0 = s9.c0 join small_table1 s10 on s1.c0 = s10.c0;
119119
-- union all with cte
120120
with agged_table as ( select distinct c0, c1 from t0) select /*+SET_VAR(cbo_cte_reuse_rate=-1) */ count(*), sum(c0),sum(c1) from (select * from agged_table union all select l.c1, l.c0 from agged_table l join t0 r on l.c0=r.c2) tb;
121-
with agged_table as ( select distinct c0, c1 from t0) select /*+SET_VAR(cbo_cte_reuse_rate=-1) */ count(*), sum(c0),sum(c1) from (select l.c1, l.c0 from agged_table l join t0 r on l.c0=r.c2 union all select * from agged_table) tb;
121+
with agged_table as ( select distinct c0, c1 from t0) select /*+SET_VAR(cbo_cte_reuse_rate=-1) */ count(*), sum(c0),sum(c1) from (select l.c1, l.c0 from agged_table l join t0 r on l.c0=r.c2 union all select * from agged_table) tb;
122+
123+
-- test with sync collect profile
124+
set enable_profile=true;
125+
set enable_async_profile=true;
126+
set profile_timeout=300;
127+
select count(*) from small_table1 s1 join small_table2 s2 on s1.c0 = s2.c0 join small_table3 s3 on s1.c0 = s3.c0 where s3.c3 = 0;

0 commit comments

Comments
 (0)