Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -26,6 +23,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
Expand All @@ -36,6 +34,8 @@
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
Expand All @@ -45,19 +45,30 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static java.util.stream.IntStream.range;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator.calculateUtilizationForWriteLoad;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class WriteLoadConstraintDeciderIT extends ESIntegTestCase {

@Override
@SuppressWarnings("unchecked")
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
return CollectionUtils.appendToCopyNoNullElements(
super.nodePlugins(),
MockTransportService.TestPlugin.class,
TestTelemetryPlugin.class
);
}

/**
Expand Down Expand Up @@ -236,11 +247,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
*/

logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node");
final InternalClusterInfoService clusterInfoService = asInstanceOf(
InternalClusterInfoService.class,
internalCluster().getInstance(ClusterInfoService.class, masterName)
);
ClusterInfoServiceUtils.refresh(clusterInfoService);
refreshClusterInfo();

logger.info(
"---> Update the filter to exclude " + firstDataNodeName + " so that shards will be reassigned away to the other nodes"
Expand All @@ -263,6 +270,57 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
}));
}

public void testMaxQueueLatencyMetricIsPublished() {
final Settings settings = Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
.build();
final var dataNodes = internalCluster().startNodes(3, settings);

// Refresh cluster info (should trigger polling)
refreshClusterInfo();

Map<String, Long> mostRecentQueueLatencyMetrics = getMostRecentQueueLatencyMetrics(dataNodes);
assertThat(mostRecentQueueLatencyMetrics.keySet(), hasSize(dataNodes.size()));
assertThat(mostRecentQueueLatencyMetrics.values(), everyItem(greaterThanOrEqualTo(0L)));

final String dataNodeToDelay = randomFrom(dataNodes);
final ThreadPool threadPoolToDelay = internalCluster().getInstance(ThreadPool.class, dataNodeToDelay);

// Fill the write thread pool and block a task for some time
final int writeThreadPoolSize = threadPoolToDelay.info(ThreadPool.Names.WRITE).getMax();
final var latch = new CountDownLatch(1);
final var writeThreadPool = threadPoolToDelay.executor(ThreadPool.Names.WRITE);
range(0, writeThreadPoolSize + 1).forEach(i -> writeThreadPool.execute(() -> safeAwait(latch)));
final long delayMillis = randomIntBetween(100, 200);
safeSleep(delayMillis);
// Unblock the pool
latch.countDown();

refreshClusterInfo();
mostRecentQueueLatencyMetrics = getMostRecentQueueLatencyMetrics(dataNodes);
assertThat(mostRecentQueueLatencyMetrics.keySet(), hasSize(dataNodes.size()));
assertThat(mostRecentQueueLatencyMetrics.get(dataNodeToDelay), greaterThanOrEqualTo(delayMillis));
}

private static Map<String, Long> getMostRecentQueueLatencyMetrics(List<String> dataNodes) {
final Map<String, Long> measurements = new HashMap<>();
for (String nodeName : dataNodes) {
PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, nodeName);
final TestTelemetryPlugin telemetryPlugin = pluginsService.filterPlugins(TestTelemetryPlugin.class).findFirst().orElseThrow();
telemetryPlugin.collect();
final var maxLatencyValues = telemetryPlugin.getLongGaugeMeasurement(
DesiredBalanceMetrics.WRITE_LOAD_DECIDER_MAX_LATENCY_VALUE
);
if (maxLatencyValues.isEmpty() == false) {
measurements.put(nodeName, maxLatencyValues.getLast().getLong());
}
}
return measurements;
}

/**
* Verifies that the {@link RoutingNodes} shows that the expected portion of an index's shards are assigned to each node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,30 @@

package org.elasticsearch.action.admin.cluster.node.usage;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

/**
* Collects some thread pool stats from each data node for purposes of shard allocation balancing. The specific stats are defined in
Expand All @@ -42,20 +45,21 @@ public class TransportNodeUsageStatsForThreadPoolsAction extends TransportNodesA
NodeUsageStatsForThreadPoolsAction.NodeResponse,
Void> {

private static final Logger logger = LogManager.getLogger(TransportNodeUsageStatsForThreadPoolsAction.class);

public static final String NAME = "internal:monitor/thread_pool/stats";
public static final ActionType<NodeUsageStatsForThreadPoolsAction.Response> TYPE = new ActionType<>(NAME);
private static final int NO_VALUE = -1;

private final ThreadPool threadPool;
private final ClusterService clusterService;
private final AtomicLong lastMaxQueueLatencyMillis = new AtomicLong(NO_VALUE);

@Inject
public TransportNodeUsageStatsForThreadPoolsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters
ActionFilters actionFilters,
DesiredBalanceMetrics desiredBalanceMetrics
) {
super(
NAME,
Expand All @@ -67,6 +71,7 @@ public TransportNodeUsageStatsForThreadPoolsAction(
);
this.threadPool = threadPool;
this.clusterService = clusterService;
desiredBalanceMetrics.registerWriteLoadDeciderMaxLatencyGauge(this::getMaxQueueLatencyMetric);
}

@Override
Expand Down Expand Up @@ -99,15 +104,17 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor;
var trackingForWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor;

long maxQueueLatencyMillis = Math.max(
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(),
trackingForWriteExecutor.peekMaxQueueLatencyInQueueMillis()
);
lastMaxQueueLatencyMillis.set(maxQueueLatencyMillis);
ThreadPoolUsageStats threadPoolUsageStats = new ThreadPoolUsageStats(
trackingForWriteExecutor.getMaximumPoolSize(),
(float) trackingForWriteExecutor.pollUtilization(
TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION
),
Math.max(
trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(),
trackingForWriteExecutor.peekMaxQueueLatencyInQueueMillis()
)
maxQueueLatencyMillis
);

Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();
Expand All @@ -117,4 +124,13 @@ protected NodeUsageStatsForThreadPoolsAction.NodeResponse nodeOperation(
new NodeUsageStatsForThreadPools(localNode.getId(), perThreadPool)
);
}

private Collection<LongWithAttributes> getMaxQueueLatencyMetric() {
long maxQueueLatencyValue = lastMaxQueueLatencyMillis.getAndSet(NO_VALUE);
if (maxQueueLatencyValue != NO_VALUE) {
return Set.of(new LongWithAttributes(maxQueueLatencyValue));
} else {
return Set.of();
}
}
}
16 changes: 10 additions & 6 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction;
import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory;
Expand Down Expand Up @@ -138,6 +139,7 @@ public class ClusterModule extends AbstractModule {
private final ShardRoutingRoleStrategy shardRoutingRoleStrategy;
private final AllocationStatsService allocationStatsService;
private final TelemetryProvider telemetryProvider;
private final DesiredBalanceMetrics desiredBalanceMetrics;

public ClusterModule(
Settings settings,
Expand All @@ -164,6 +166,7 @@ public ClusterModule(
writeLoadForecaster,
balancingWeightsFactory
);
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
this.shardsAllocator = createShardsAllocator(
settings,
clusterService.getClusterSettings(),
Expand All @@ -174,9 +177,9 @@ public ClusterModule(
clusterService,
this::reconcile,
writeLoadForecaster,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator,
this::explainShardAllocation
this::explainShardAllocation,
desiredBalanceMetrics
);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver);
Expand Down Expand Up @@ -497,9 +500,9 @@ private static ShardsAllocator createShardsAllocator(
ClusterService clusterService,
DesiredBalanceReconcilerAction reconciler,
WriteLoadForecaster writeLoadForecaster,
TelemetryProvider telemetryProvider,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
ShardAllocationExplainer shardAllocationExplainer
ShardAllocationExplainer shardAllocationExplainer,
DesiredBalanceMetrics desiredBalanceMetrics
) {
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(
Expand All @@ -514,9 +517,9 @@ private static ShardsAllocator createShardsAllocator(
threadPool,
clusterService,
reconciler,
telemetryProvider,
nodeAllocationStatsAndWeightsCalculator,
shardAllocationExplainer
shardAllocationExplainer,
desiredBalanceMetrics
)
);

Expand Down Expand Up @@ -561,6 +564,7 @@ protected void configure() {
bind(ShardRoutingRoleStrategy.class).toInstance(shardRoutingRoleStrategy);
bind(AllocationStatsService.class).toInstance(allocationStatsService);
bind(TelemetryProvider.class).toInstance(telemetryProvider);
bind(DesiredBalanceMetrics.class).toInstance(desiredBalanceMetrics);
bind(MetadataRolloverService.class).asEagerSingleton();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;

/**
Expand All @@ -31,6 +33,8 @@
*/
public class DesiredBalanceMetrics {

public static DesiredBalanceMetrics NOOP = new DesiredBalanceMetrics(MeterRegistry.NOOP);

/**
* @param unassignedShards Shards that are not assigned to any node.
* @param allocationStatsByRole A breakdown of the allocations stats by {@link ShardRouting.Role}
Expand Down Expand Up @@ -124,8 +128,12 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME =
"es.allocator.allocations.node.forecasted_disk_usage_bytes.current";

// Decider metrics
public static final String WRITE_LOAD_DECIDER_MAX_LATENCY_VALUE = "es.allocator.deciders.write_load.max_latency_value.current";

public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(0, Map.of());

private final MeterRegistry meterRegistry;
private volatile boolean nodeIsMaster = false;

/**
Expand Down Expand Up @@ -153,6 +161,7 @@ public void updateMetrics(
}

public DesiredBalanceMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
meterRegistry.registerLongsGauge(
UNASSIGNED_SHARDS_METRIC_NAME,
"Current number of unassigned shards",
Expand Down Expand Up @@ -260,6 +269,15 @@ public AllocationStats allocationStats() {
return lastReconciliationAllocationStats;
}

public void registerWriteLoadDeciderMaxLatencyGauge(Supplier<Collection<LongWithAttributes>> maxLatencySupplier) {
meterRegistry.registerLongsGauge(
WRITE_LOAD_DECIDER_MAX_LATENCY_VALUE,
"max latency for write load decider",
"ms",
maxLatencySupplier
);
}

private List<LongWithAttributes> getUnassignedShardsMetrics() {
return getIfPublishing(AllocationStats::unassignedShards);
}
Expand Down
Loading