Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.elasticsearch.cluster.routing.allocation.AllocationStatsService;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
Expand Down Expand Up @@ -107,6 +109,8 @@
import java.util.Objects;
import java.util.function.Supplier;

import static org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.ShardAllocationExplainer;

/**
* Configures classes and services that affect the entire cluster.
*/
Expand Down Expand Up @@ -171,7 +175,8 @@ public ClusterModule(
this::reconcile,
writeLoadForecaster,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
nodeAllocationStatsAndWeightsCalculator,
this::explainShardAllocation
);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver);
Expand Down Expand Up @@ -237,6 +242,10 @@ private ClusterState reconcile(ClusterState clusterState, RerouteStrategy rerout
return allocationService.executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", rerouteStrategy);
}

private ShardAllocationDecision explainShardAllocation(ShardRouting shardRouting, RoutingAllocation allocation) {
return allocationService.explainShardAllocation(shardRouting, allocation);
}

public static List<Entry> getNamedWriteables() {
List<Entry> entries = new ArrayList<>();
// Cluster State
Expand Down Expand Up @@ -489,7 +498,8 @@ private static ShardsAllocator createShardsAllocator(
DesiredBalanceReconcilerAction reconciler,
WriteLoadForecaster writeLoadForecaster,
TelemetryProvider telemetryProvider,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
ShardAllocationExplainer shardAllocationExplainer
) {
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(
Expand All @@ -505,7 +515,8 @@ private static ShardsAllocator createShardsAllocator(
clusterService,
reconciler,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
nodeAllocationStatsAndWeightsCalculator,
shardAllocationExplainer
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.ShardAllocationExplainer;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.time.TimeProvider;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -39,6 +42,7 @@
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toUnmodifiableSet;

Expand All @@ -48,9 +52,13 @@
public class DesiredBalanceComputer {

private static final Logger logger = LogManager.getLogger(DesiredBalanceComputer.class);
private static final Logger allocationExplainLogger = LogManager.getLogger(
DesiredBalanceComputer.class.getCanonicalName() + ".allocation_explain"
);

private final ShardsAllocator delegateAllocator;
private final TimeProvider timeProvider;
private final ShardAllocationExplainer shardAllocationExplainer;

// stats
protected final MeanMetric iterations = new MeanMetric();
Expand All @@ -77,10 +85,17 @@ public class DesiredBalanceComputer {
private long lastConvergedTimeMillis;
private long lastNotConvergedLogMessageTimeMillis;
private Level convergenceLogMsgLevel;
private ShardRouting lastTrackedUnassignedShard;

public DesiredBalanceComputer(ClusterSettings clusterSettings, TimeProvider timeProvider, ShardsAllocator delegateAllocator) {
public DesiredBalanceComputer(
ClusterSettings clusterSettings,
TimeProvider timeProvider,
ShardsAllocator delegateAllocator,
ShardAllocationExplainer shardAllocationExplainer
) {
this.delegateAllocator = delegateAllocator;
this.timeProvider = timeProvider;
this.shardAllocationExplainer = shardAllocationExplainer;
this.numComputeCallsSinceLastConverged = 0;
this.numIterationsSinceLastConverged = 0;
this.lastConvergedTimeMillis = timeProvider.relativeTimeInMillis();
Expand Down Expand Up @@ -462,10 +477,59 @@ public DesiredBalance compute(
);
}

maybeLogAllocationExplainForUnassigned(finishReason, routingNodes, routingAllocation);

long lastConvergedIndex = hasChanges ? previousDesiredBalance.lastConvergedIndex() : desiredBalanceInput.index();
return new DesiredBalance(lastConvergedIndex, assignments, routingNodes.getBalanceWeightStatsPerNode(), finishReason);
}

private void maybeLogAllocationExplainForUnassigned(
DesiredBalance.ComputationFinishReason finishReason,
RoutingNodes routingNodes,
RoutingAllocation routingAllocation
) {
if (allocationExplainLogger.isDebugEnabled()) {
Copy link
Contributor

@DiannaHohensee DiannaHohensee Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a plan to temporarily turn debug on for the new logger in serverless? Turning it on after the fact in an incident won't be useful, I expect.

I'd be inclined to log at INFO. CONVERGED with unassigned shards shouldn't happen (except for our mystery bug). Unless decider settings are too strict (not expected in serverless), and then the same shard will be unassigned and shouldn't flood the logs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the plan is to enable the debug log in serverless. I raised ES-12842 for it. I prefer the DEBUG level since it affects stateful as well.

if (lastTrackedUnassignedShard != null) {
if (Stream.concat(routingNodes.unassigned().stream(), routingNodes.unassigned().ignored().stream())
.noneMatch(shardRouting -> shardRouting.equals(lastTrackedUnassignedShard))) {
allocationExplainLogger.debug("previously tracked unassigned shard [{}] is now assigned", lastTrackedUnassignedShard);
lastTrackedUnassignedShard = null;
} else {
return; // The last tracked unassigned shard is still unassigned, keep tracking it
}
}

assert lastTrackedUnassignedShard == null : "unexpected non-null lastTrackedUnassignedShard " + lastTrackedUnassignedShard;
if (routingNodes.hasUnassignedShards() && finishReason == DesiredBalance.ComputationFinishReason.CONVERGED) {
final Predicate<ShardRouting> predicate = routingNodes.hasUnassignedPrimaries() ? ShardRouting::primary : shard -> true;
lastTrackedUnassignedShard = Stream.concat(routingNodes.unassigned().stream(), routingNodes.unassigned().ignored().stream())
.filter(predicate)
.findFirst()
.orElseThrow();

final var originalDebugMode = routingAllocation.getDebugMode();
routingAllocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS);
final ShardAllocationDecision shardAllocationDecision;
try {
shardAllocationDecision = shardAllocationExplainer.explain(lastTrackedUnassignedShard, routingAllocation);
} finally {
routingAllocation.setDebugMode(originalDebugMode);
}
allocationExplainLogger.debug(
"unassigned shard [{}] with allocation decision {}",
lastTrackedUnassignedShard,
org.elasticsearch.common.Strings.toString(
p -> ChunkedToXContentHelper.object("node_allocation_decision", shardAllocationDecision.toXContentChunked(p))
)
);
}
} else {
if (lastTrackedUnassignedShard != null) {
lastTrackedUnassignedShard = null;
}
}
}

// visible for testing
boolean hasEnoughIterations(int currentIteration) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,26 @@ public interface DesiredBalanceReconcilerAction {
ClusterState apply(ClusterState clusterState, RerouteStrategy rerouteStrategy);
}

@FunctionalInterface
public interface ShardAllocationExplainer {
ShardAllocationDecision explain(ShardRouting shard, RoutingAllocation allocation);
}

public DesiredBalanceShardsAllocator(
ClusterSettings clusterSettings,
ShardsAllocator delegateAllocator,
ThreadPool threadPool,
ClusterService clusterService,
DesiredBalanceReconcilerAction reconciler,
TelemetryProvider telemetryProvider,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
ShardAllocationExplainer shardAllocationExplainer
) {
this(
delegateAllocator,
threadPool,
clusterService,
new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator),
new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator, shardAllocationExplainer),
reconciler,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testDeleteDesiredBalance() throws Exception {
var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);

var delegate = new BalancedShardsAllocator();
var computer = new DesiredBalanceComputer(clusterSettings, threadPool, delegate) {
var computer = new DesiredBalanceComputer(clusterSettings, threadPool, delegate, TEST_ONLY_EXPLAINER) {

final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ public void testUndesiredShardCount() {
clusterService,
(innerState, strategy) -> innerState,
TelemetryProvider.NOOP,
EMPTY_NODE_ALLOCATION_STATS
EMPTY_NODE_ALLOCATION_STATS,
TEST_ONLY_EXPLAINER
) {
@Override
public DesiredBalance getDesiredBalance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ private Map.Entry<MockAllocationService, ShardsAllocator> createNewAllocationSer
(clusterState, routingAllocationAction) -> strategyRef.get()
.executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", routingAllocationAction),
TelemetryProvider.NOOP,
EMPTY_NODE_ALLOCATION_STATS
EMPTY_NODE_ALLOCATION_STATS,
TEST_ONLY_EXPLAINER
) {
@Override
public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) {
Expand Down
Loading