17
17
import org .elasticsearch .cluster .routing .ShardRouting ;
18
18
import org .elasticsearch .cluster .routing .UnassignedInfo ;
19
19
import org .elasticsearch .cluster .routing .allocation .RoutingAllocation ;
20
+ import org .elasticsearch .cluster .routing .allocation .ShardAllocationDecision ;
21
+ import org .elasticsearch .cluster .routing .allocation .allocator .DesiredBalanceShardsAllocator .ShardAllocationExplainer ;
20
22
import org .elasticsearch .cluster .routing .allocation .command .MoveAllocationCommand ;
21
23
import org .elasticsearch .cluster .routing .allocation .decider .Decision ;
22
24
import org .elasticsearch .common .metrics .MeanMetric ;
23
25
import org .elasticsearch .common .settings .ClusterSettings ;
24
26
import org .elasticsearch .common .settings .Setting ;
25
27
import org .elasticsearch .common .time .TimeProvider ;
26
28
import org .elasticsearch .common .util .Maps ;
29
+ import org .elasticsearch .common .xcontent .ChunkedToXContentHelper ;
27
30
import org .elasticsearch .core .Strings ;
28
31
import org .elasticsearch .core .TimeValue ;
29
32
import org .elasticsearch .index .shard .ShardId ;
39
42
import java .util .TreeMap ;
40
43
import java .util .TreeSet ;
41
44
import java .util .function .Predicate ;
45
+ import java .util .stream .Stream ;
42
46
43
47
import static java .util .stream .Collectors .toUnmodifiableSet ;
44
48
48
52
public class DesiredBalanceComputer {
49
53
50
54
private static final Logger logger = LogManager .getLogger (DesiredBalanceComputer .class );
55
+ private static final Logger allocationExplainLogger = LogManager .getLogger (
56
+ DesiredBalanceComputer .class .getCanonicalName () + ".allocation_explain"
57
+ );
51
58
52
59
private final ShardsAllocator delegateAllocator ;
53
60
private final TimeProvider timeProvider ;
61
+ private final ShardAllocationExplainer shardAllocationExplainer ;
54
62
55
63
// stats
56
64
protected final MeanMetric iterations = new MeanMetric ();
@@ -77,10 +85,17 @@ public class DesiredBalanceComputer {
77
85
private long lastConvergedTimeMillis ;
78
86
private long lastNotConvergedLogMessageTimeMillis ;
79
87
private Level convergenceLogMsgLevel ;
88
+ private ShardRouting lastTrackedUnassignedShard ;
80
89
81
- public DesiredBalanceComputer (ClusterSettings clusterSettings , TimeProvider timeProvider , ShardsAllocator delegateAllocator ) {
90
+ public DesiredBalanceComputer (
91
+ ClusterSettings clusterSettings ,
92
+ TimeProvider timeProvider ,
93
+ ShardsAllocator delegateAllocator ,
94
+ ShardAllocationExplainer shardAllocationExplainer
95
+ ) {
82
96
this .delegateAllocator = delegateAllocator ;
83
97
this .timeProvider = timeProvider ;
98
+ this .shardAllocationExplainer = shardAllocationExplainer ;
84
99
this .numComputeCallsSinceLastConverged = 0 ;
85
100
this .numIterationsSinceLastConverged = 0 ;
86
101
this .lastConvergedTimeMillis = timeProvider .relativeTimeInMillis ();
@@ -462,10 +477,59 @@ public DesiredBalance compute(
462
477
);
463
478
}
464
479
480
+ maybeLogAllocationExplainForUnassigned (finishReason , routingNodes , routingAllocation );
481
+
465
482
long lastConvergedIndex = hasChanges ? previousDesiredBalance .lastConvergedIndex () : desiredBalanceInput .index ();
466
483
return new DesiredBalance (lastConvergedIndex , assignments , routingNodes .getBalanceWeightStatsPerNode (), finishReason );
467
484
}
468
485
486
+ private void maybeLogAllocationExplainForUnassigned (
487
+ DesiredBalance .ComputationFinishReason finishReason ,
488
+ RoutingNodes routingNodes ,
489
+ RoutingAllocation routingAllocation
490
+ ) {
491
+ if (allocationExplainLogger .isDebugEnabled ()) {
492
+ if (lastTrackedUnassignedShard != null ) {
493
+ if (Stream .concat (routingNodes .unassigned ().stream (), routingNodes .unassigned ().ignored ().stream ())
494
+ .noneMatch (shardRouting -> shardRouting .equals (lastTrackedUnassignedShard ))) {
495
+ allocationExplainLogger .debug ("previously tracked unassigned shard [{}] is now assigned" , lastTrackedUnassignedShard );
496
+ lastTrackedUnassignedShard = null ;
497
+ } else {
498
+ return ; // The last tracked unassigned shard is still unassigned, keep tracking it
499
+ }
500
+ }
501
+
502
+ assert lastTrackedUnassignedShard == null : "unexpected non-null lastTrackedUnassignedShard " + lastTrackedUnassignedShard ;
503
+ if (routingNodes .hasUnassignedShards () && finishReason == DesiredBalance .ComputationFinishReason .CONVERGED ) {
504
+ final Predicate <ShardRouting > predicate = routingNodes .hasUnassignedPrimaries () ? ShardRouting ::primary : shard -> true ;
505
+ lastTrackedUnassignedShard = Stream .concat (routingNodes .unassigned ().stream (), routingNodes .unassigned ().ignored ().stream ())
506
+ .filter (predicate )
507
+ .findFirst ()
508
+ .orElseThrow ();
509
+
510
+ final var originalDebugMode = routingAllocation .getDebugMode ();
511
+ routingAllocation .setDebugMode (RoutingAllocation .DebugMode .EXCLUDE_YES_DECISIONS );
512
+ final ShardAllocationDecision shardAllocationDecision ;
513
+ try {
514
+ shardAllocationDecision = shardAllocationExplainer .explain (lastTrackedUnassignedShard , routingAllocation );
515
+ } finally {
516
+ routingAllocation .setDebugMode (originalDebugMode );
517
+ }
518
+ allocationExplainLogger .debug (
519
+ "unassigned shard [{}] with allocation decision {}" ,
520
+ lastTrackedUnassignedShard ,
521
+ org .elasticsearch .common .Strings .toString (
522
+ p -> ChunkedToXContentHelper .object ("node_allocation_decision" , shardAllocationDecision .toXContentChunked (p ))
523
+ )
524
+ );
525
+ }
526
+ } else {
527
+ if (lastTrackedUnassignedShard != null ) {
528
+ lastTrackedUnassignedShard = null ;
529
+ }
530
+ }
531
+ }
532
+
469
533
// visible for testing
470
534
boolean hasEnoughIterations (int currentIteration ) {
471
535
return true ;
0 commit comments