Skip to content

Commit cc66490

Browse files
authored
Refactor enrich policy resolution (#133928)
1 parent a0cd917 commit cc66490

File tree

4 files changed

+32
-25
lines changed

4 files changed

+32
-25
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,26 @@ public static UnresolvedPolicy from(Enrich e) {
110110
/**
111111
* Resolves a set of enrich policies
112112
*
113-
* @param unresolvedPolicies the unresolved policies
113+
* @param enriches the unresolved policies
114114
* @param executionInfo the execution info
115115
* @param listener notified with the enrich resolution
116116
*/
117-
public void resolvePolicies(
117+
public void resolvePolicies(List<Enrich> enriches, EsqlExecutionInfo executionInfo, ActionListener<EnrichResolution> listener) {
118+
if (enriches.isEmpty()) {
119+
listener.onResponse(new EnrichResolution());
120+
return;
121+
}
122+
123+
doResolvePolicies(
124+
new HashSet<>(executionInfo.getClusters().keySet()),
125+
enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(),
126+
executionInfo,
127+
listener
128+
);
129+
}
130+
131+
protected void doResolvePolicies(
132+
Set<String> remoteClusters,
118133
Collection<UnresolvedPolicy> unresolvedPolicies,
119134
EsqlExecutionInfo executionInfo,
120135
ActionListener<EnrichResolution> listener
@@ -124,13 +139,10 @@ public void resolvePolicies(
124139
return;
125140
}
126141

127-
final Set<String> remoteClusters = new HashSet<>(executionInfo.getClusters().keySet());
128142
final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
129143
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
130144
final EnrichResolution enrichResolution = new EnrichResolution();
131-
132-
Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();
133-
145+
final Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();
134146
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
135147
String clusterAlias = entry.getKey();
136148
if (entry.getValue().connectionError != null) {
@@ -424,17 +436,15 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
424436
new ChannelActionListener<>(channel),
425437
threadContext
426438
);
427-
try (
428-
RefCountingListener refs = new RefCountingListener(listener.map(unused -> new LookupResponse(resolvedPolices, failures)))
429-
) {
439+
try (var refs = new RefCountingListener(listener.map(unused -> new LookupResponse(resolvedPolices, failures)))) {
430440
for (String policyName : request.policyNames) {
431441
EnrichPolicy p = availablePolicies.get(policyName);
432442
if (p == null) {
433443
continue;
434444
}
435445
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
436446
String indexName = EnrichPolicy.getBaseName(policyName);
437-
indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, refs.acquire(indexResult -> {
447+
indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, false, refs.acquire(indexResult -> {
438448
if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) {
439449
EsIndex esIndex = indexResult.get();
440450
var concreteIndices = Map.of(request.clusterAlias, Iterables.get(esIndex.concreteIndices(), 0));
@@ -449,17 +459,15 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
449459
} else {
450460
failures.put(policyName, indexResult.toString());
451461
}
452-
}), false);
462+
}));
453463
}
454464
}
455465
}
456466
}
457467
}
458468

459469
protected Map<String, EnrichPolicy> availablePolicies() {
460-
final EnrichMetadata metadata = projectResolver.getProjectMetadata(clusterService.state())
461-
.custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY);
462-
return metadata.getPolicies();
470+
return projectResolver.getProjectMetadata(clusterService.state()).custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY).getPolicies();
463471
}
464472

465473
protected void getRemoteConnection(String cluster, ActionListener<Transport.Connection> listener) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -373,12 +373,10 @@ public void analyzedPlan(
373373
}
374374

375375
PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed);
376-
var unresolvedPolicies = preAnalysis.enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).collect(toSet());
377-
378376
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);
379377

380378
var listener = SubscribableListener. //
381-
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(unresolvedPolicies, executionInfo, l))
379+
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches, executionInfo, l))
382380
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
383381
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l));
384382
// first resolve the lookup indices, then the main indices
@@ -424,8 +422,8 @@ private void preAnalyzeLookupIndex(
424422
patternWithRemotes,
425423
fieldNames,
426424
null,
427-
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution)),
428-
false
425+
false,
426+
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution))
429427
);
430428
}
431429

@@ -655,10 +653,10 @@ private void preAnalyzeMainIndices(
655653
indexExpressionToResolve,
656654
result.fieldNames,
657655
requestFilter,
656+
includeAllDimensions,
658657
listener.delegateFailure((l, indexResolution) -> {
659658
l.onResponse(result.withIndexResolution(indexResolution));
660-
}),
661-
includeAllDimensions
659+
})
662660
);
663661
}
664662
} else {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ public void resolveAsMergedMapping(
8181
String indexWildcard,
8282
Set<String> fieldNames,
8383
QueryBuilder requestFilter,
84-
ActionListener<IndexResolution> listener,
85-
boolean includeAllDimensions
84+
boolean includeAllDimensions,
85+
ActionListener<IndexResolution> listener
8686
) {
8787
client.execute(
8888
EsqlResolveFieldsAction.TYPE,

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.ArrayList;
5252
import java.util.Collection;
5353
import java.util.HashMap;
54+
import java.util.HashSet;
5455
import java.util.List;
5556
import java.util.Map;
5657
import java.util.Set;
@@ -434,7 +435,6 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver {
434435
}
435436

436437
EnrichResolution resolvePolicies(Collection<String> clusters, Collection<UnresolvedPolicy> unresolvedPolicies) {
437-
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
438438
EsqlExecutionInfo esqlExecutionInfo = new EsqlExecutionInfo(true);
439439
for (String cluster : clusters) {
440440
esqlExecutionInfo.swapCluster(cluster, (k, v) -> new EsqlExecutionInfo.Cluster(cluster, "*"));
@@ -452,7 +452,8 @@ EnrichResolution resolvePolicies(Collection<String> clusters, Collection<Unresol
452452
unresolvedPolicies.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values())));
453453
}
454454
}
455-
super.resolvePolicies(unresolvedPolicies, esqlExecutionInfo, future);
455+
PlainActionFuture<EnrichResolution> future = new PlainActionFuture<>();
456+
super.doResolvePolicies(new HashSet<>(clusters), unresolvedPolicies, esqlExecutionInfo, future);
456457
return future.actionGet(30, TimeUnit.SECONDS);
457458
}
458459

0 commit comments

Comments
 (0)