Skip to content

Commit 938aa13

Browse files
committed
resolve remotes with field caps call
1 parent cc66490 commit 938aa13

File tree

4 files changed

+46
-14
lines changed

4 files changed

+46
-14
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.util.concurrent.ThreadContext;
2424
import org.elasticsearch.common.util.iterable.Iterables;
2525
import org.elasticsearch.core.Tuple;
26+
import org.elasticsearch.index.query.QueryBuilder;
2627
import org.elasticsearch.tasks.Task;
2728
import org.elasticsearch.threadpool.ThreadPool;
2829
import org.elasticsearch.transport.AbstractTransportRequest;
@@ -39,21 +40,24 @@
3940
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
4041
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
4142
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
43+
import org.elasticsearch.xpack.esql.analysis.PreAnalyzer;
4244
import org.elasticsearch.xpack.esql.core.type.DataType;
4345
import org.elasticsearch.xpack.esql.core.type.EsField;
4446
import org.elasticsearch.xpack.esql.core.util.StringUtils;
4547
import org.elasticsearch.xpack.esql.index.EsIndex;
48+
import org.elasticsearch.xpack.esql.index.MappingException;
4649
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
4750
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
51+
import org.elasticsearch.xpack.esql.plan.IndexPattern;
4852
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
53+
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
4954
import org.elasticsearch.xpack.esql.session.IndexResolver;
5055

5156
import java.io.IOException;
5257
import java.util.ArrayList;
5358
import java.util.Collection;
5459
import java.util.Collections;
5560
import java.util.HashMap;
56-
import java.util.HashSet;
5761
import java.util.List;
5862
import java.util.Map;
5963
import java.util.Set;
@@ -110,22 +114,37 @@ public static UnresolvedPolicy from(Enrich e) {
110114
/**
111115
* Resolves a set of enrich policies
112116
*
113-
* @param enriches the unresolved policies
117+
* @param preAnalysis to retrieve indices and enriches to resolve
118+
* @param requestFilter to resolve target clusters
114119
* @param executionInfo the execution info
115120
* @param listener notified with the enrich resolution
116121
*/
117-
public void resolvePolicies(List<Enrich> enriches, EsqlExecutionInfo executionInfo, ActionListener<EnrichResolution> listener) {
118-
if (enriches.isEmpty()) {
122+
public void resolvePolicies(
123+
PreAnalyzer.PreAnalysis preAnalysis,
124+
QueryBuilder requestFilter,
125+
EsqlExecutionInfo executionInfo,
126+
ActionListener<EnrichResolution> listener
127+
) {
128+
if (preAnalysis.enriches.isEmpty()) {
119129
listener.onResponse(new EnrichResolution());
120130
return;
121131
}
122132

123-
doResolvePolicies(
124-
new HashSet<>(executionInfo.getClusters().keySet()),
125-
enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(),
126-
executionInfo,
127-
listener
128-
);
133+
doResolveRemotes(preAnalysis.indices, requestFilter, listener.delegateFailureAndWrap((l, remotes) -> {
134+
doResolvePolicies(remotes, preAnalysis.enriches.stream().map(UnresolvedPolicy::from).toList(), executionInfo, l);
135+
}));
136+
}
137+
138+
private void doResolveRemotes(List<IndexPattern> indexPatterns, QueryBuilder requestFilter, ActionListener<Set<String>> listener) {
139+
switch (indexPatterns.size()) {
140+
case 0 -> listener.onResponse(Set.of());
141+
case 1 -> indexResolver.resolveConcreteIndices(
142+
indexPatterns.getFirst().indexPattern(),
143+
requestFilter,
144+
listener.map(EsqlCCSUtils::getRemotesOf)
145+
);
146+
default -> listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
147+
}
129148
}
130149

131150
protected void doResolvePolicies(
@@ -442,7 +461,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
442461
if (p == null) {
443462
continue;
444463
}
445-
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
464+
try (var ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
446465
String indexName = EnrichPolicy.getBaseName(policyName);
447466
indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, false, refs.acquire(indexResult -> {
448467
if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040
import java.util.Map;
4141
import java.util.Objects;
4242
import java.util.Set;
43-
import java.util.stream.Collectors;
43+
44+
import static java.util.stream.Collectors.toSet;
4445

4546
public class EsqlCCSUtils {
4647

@@ -206,7 +207,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
206207
// NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters.
207208
final Set<String> clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING)
208209
.map(Cluster::getClusterAlias)
209-
.collect(Collectors.toSet());
210+
.collect(toSet());
210211
for (String indexName : indexResolution.resolvedIndices()) {
211212
clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName));
212213
}
@@ -414,4 +415,8 @@ public static String inClusterName(String clusterAlias) {
414415
return "in remote cluster [" + clusterAlias + "]";
415416
}
416417
}
418+
419+
public static Set<String> getRemotesOf(Set<String> concreteIndices) {
420+
return concreteIndices.stream().map(RemoteClusterAware::parseClusterAlias).collect(toSet());
421+
}
417422
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public void analyzedPlan(
376376
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);
377377

378378
var listener = SubscribableListener. //
379-
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches, executionInfo, l))
379+
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis, requestFilter, executionInfo, l))
380380
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
381381
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(parsed, preAnalysisResult, l));
382382
// first resolve the lookup indices, then the main indices

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ public IndexResolver(Client client) {
7474
this.client = client;
7575
}
7676

77+
public void resolveConcreteIndices(String indexPattern, QueryBuilder requestFilter, ActionListener<Set<String>> listener) {
78+
client.execute(
79+
EsqlResolveFieldsAction.TYPE,
80+
createFieldCapsRequest(indexPattern, Set.of("_id"), requestFilter, false),
81+
listener.delegateFailureAndWrap((l, response) -> l.onResponse(Set.of(response.getIndices())))
82+
);
83+
}
84+
7785
/**
7886
* Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping.
7987
*/

0 commit comments

Comments
 (0)