Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.jar.JarInputStream;
import java.util.zip.ZipEntry;

Expand Down Expand Up @@ -548,23 +549,17 @@ public static List<List<Object>> getValuesList(EsqlQueryResponse results) {
}

public static List<List<Object>> getValuesList(Iterator<Iterator<Object>> values) {
var valuesList = new ArrayList<List<Object>>();
values.forEachRemaining(row -> {
var rowValues = new ArrayList<>();
row.forEachRemaining(rowValues::add);
valuesList.add(rowValues);
});
return valuesList;
return toList(values, row -> toList(row, Function.identity()));
}

public static List<List<Object>> getValuesList(Iterable<Iterable<Object>> values) {
var valuesList = new ArrayList<List<Object>>();
values.iterator().forEachRemaining(row -> {
var rowValues = new ArrayList<>();
row.iterator().forEachRemaining(rowValues::add);
valuesList.add(rowValues);
});
return valuesList;
return toList(values.iterator(), row -> toList(row.iterator(), Function.identity()));
}

private static <E, T> List<T> toList(Iterator<E> iterable, Function<E, T> transformer) {
var list = new ArrayList<T>();
iterable.forEachRemaining(e -> list.add(transformer.apply(e)));
return list;
}

public static List<String> withDefaultLimitWarning(List<String> warnings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ public void testLimitThenEnrichRemote() {
FROM *:events,events
| LIMIT 25
| eval ip= TO_STR(host)
| %s | KEEP host, timestamp, user, os
| %s
| KEEP host, timestamp, user, os
""", enrichHosts(Enrich.Mode.REMOTE));
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
var values = getValuesList(resp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/
public class PreAnalyzer {

public record PreAnalysis(IndexMode indexMode, IndexPattern index, List<Enrich> enriches, List<IndexPattern> lookupIndices) {
public record PreAnalysis(IndexMode indexMode, IndexPattern indexPattern, List<Enrich> enriches, List<IndexPattern> lookupIndices) {
public static final PreAnalysis EMPTY = new PreAnalysis(null, null, List.of(), List.of());
}

Expand All @@ -37,7 +37,7 @@ public PreAnalysis preAnalyze(LogicalPlan plan) {
protected PreAnalysis doPreAnalyze(LogicalPlan plan) {

Holder<IndexMode> indexMode = new Holder<>();
Holder<IndexPattern> index = new Holder<>();
Holder<IndexPattern> indexPattern = new Holder<>();

List<Enrich> unresolvedEnriches = new ArrayList<>();
List<IndexPattern> lookupIndices = new ArrayList<>();
Expand All @@ -47,7 +47,7 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
lookupIndices.add(p.indexPattern());
} else if (indexMode.get() == null || indexMode.get() == p.indexMode()) {
indexMode.set(p.indexMode());
index.set(p.indexPattern());
indexPattern.set(p.indexPattern());
} else {
throw new IllegalStateException("index mode is already set");
}
Expand All @@ -58,6 +58,6 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
// mark plan as preAnalyzed (if it were marked, there would be no analysis)
plan.forEachUp(LogicalPlan::setPreAnalyzed);

return new PreAnalysis(indexMode.get(), index.get(), unresolvedEnriches, lookupIndices);
return new PreAnalysis(indexMode.get(), indexPattern.get(), unresolvedEnriches, lookupIndices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
Expand All @@ -39,21 +40,23 @@
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
import org.elasticsearch.xpack.esql.analysis.PreAnalyzer;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.EsField;
import org.elasticsearch.xpack.esql.core.util.StringUtils;
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
import org.elasticsearch.xpack.esql.session.IndexResolver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -110,22 +113,33 @@ public static UnresolvedPolicy from(Enrich e) {
/**
* Resolves a set of enrich policies
*
* @param enriches the unresolved policies
* @param preAnalysis to retrieve indices and enriches to resolve
* @param requestFilter to resolve target clusters
* @param executionInfo the execution info
* @param listener notified with the enrich resolution
*/
public void resolvePolicies(List<Enrich> enriches, EsqlExecutionInfo executionInfo, ActionListener<EnrichResolution> listener) {
if (enriches.isEmpty()) {
public void resolvePolicies(
PreAnalyzer.PreAnalysis preAnalysis,
QueryBuilder requestFilter,
EsqlExecutionInfo executionInfo,
ActionListener<EnrichResolution> listener
) {
if (preAnalysis.enriches().isEmpty()) {
listener.onResponse(new EnrichResolution());
return;
}

doResolvePolicies(
new HashSet<>(executionInfo.getClusters().keySet()),
enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(),
executionInfo,
listener
);
doResolveRemotes(preAnalysis.indexPattern(), requestFilter, listener.delegateFailureAndWrap((l, remotes) -> {
doResolvePolicies(remotes, preAnalysis.enriches().stream().map(UnresolvedPolicy::from).toList(), executionInfo, l);
}));
}

private void doResolveRemotes(IndexPattern indexPattern, QueryBuilder requestFilter, ActionListener<Set<String>> listener) {
if (indexPattern != null) {
indexResolver.resolveConcreteIndices(indexPattern.indexPattern(), requestFilter, listener.map(EsqlCCSUtils::getRemotesOf));
} else {
listener.onResponse(Set.of());
}
}

protected void doResolvePolicies(
Expand Down Expand Up @@ -442,7 +456,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
if (p == null) {
continue;
}
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
try (var ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
String indexName = EnrichPolicy.getBaseName(policyName);
indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, false, refs.acquire(indexResult -> {
if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toSet;

public class EsqlCCSUtils {

Expand Down Expand Up @@ -206,7 +207,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
// NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters.
final Set<String> clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING)
.map(Cluster::getClusterAlias)
.collect(Collectors.toSet());
.collect(toSet());
for (String indexName : indexResolution.resolvedIndices()) {
clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName));
}
Expand Down Expand Up @@ -322,10 +323,10 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) {
public static void initCrossClusterState(
IndicesExpressionGrouper indicesGrouper,
XPackLicenseState licenseState,
IndexPattern pattern,
IndexPattern indexPattern,
EsqlExecutionInfo executionInfo
) throws ElasticsearchStatusException {
if (pattern == null) {
if (indexPattern == null) {
return;
}
try {
Expand All @@ -334,7 +335,7 @@ public static void initCrossClusterState(
// it is copied here so that we have the same resolution when request contains multiple remote cluster patterns with *
Set.copyOf(indicesGrouper.getConfiguredClusters()),
IndicesOptions.DEFAULT,
pattern.indexPattern()
indexPattern.indexPattern()
);

executionInfo.clusterInfoInitializing(true);
Expand Down Expand Up @@ -413,4 +414,8 @@ public static String inClusterName(String clusterAlias) {
return "in remote cluster [" + clusterAlias + "]";
}
}

public static Set<String> getRemotesOf(Set<String> concreteIndices) {
return concreteIndices.stream().map(RemoteClusterAware::parseClusterAlias).collect(toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,10 @@ public void analyzedPlan(
}

var preAnalysis = preAnalyzer.preAnalyze(parsed);
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.index(), executionInfo);
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo);

SubscribableListener. //
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l))
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis, requestFilter, executionInfo, l))
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
.<PreAnalysisResult>andThen((l, r) -> resolveInferences(parsed, r, l))
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l))
Expand Down Expand Up @@ -632,12 +632,12 @@ private void preAnalyzeMainIndices(
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
if (preAnalysis.index() != null) {
if (preAnalysis.indexPattern() != null) {
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
listener.onResponse(
result.withIndexResolution(IndexResolution.valid(new EsIndex(preAnalysis.index().indexPattern(), Map.of(), Map.of())))
result.withIndexResolution(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of())))
);
} else {
boolean includeAllDimensions = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.TreeMap;
import java.util.TreeSet;

import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME;
import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD;
import static org.elasticsearch.xpack.esql.core.type.DataType.OBJECT;
Expand Down Expand Up @@ -74,6 +75,16 @@ public IndexResolver(Client client) {
this.client = client;
}

public void resolveConcreteIndices(String indexPattern, QueryBuilder requestFilter, ActionListener<Set<String>> listener) {
client.execute(
EsqlResolveFieldsAction.TYPE,
createFieldCapsRequest(indexPattern, Set.of("_id"), requestFilter, false),
listener.delegateFailureAndWrap((l, response) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder though what happens here if one of the clusters fails. Will it fail this listener and thus the whole request? In other places, we have to do some handling to figure out what failed and skip failed clusters and so on, but we don't do it here. I'm not sure what happens here if one of the clusters fails?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I am afraid of is if EsqlResolveFieldsAction returns partial response with say cluster A failing and thus having no indices but other clusters having indices, then A would be excluded from the list but not marked as skipped, and then there could be problems with it downstream.

l.onResponse(response.getIndexResponses().stream().map(FieldCapabilitiesIndexResponse::getIndexName).collect(toSet()));
})
);
}

/**
* Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,12 @@ private LogicalPlan analyzedPlan(LogicalPlan parsed, CsvTestsDataLoader.MultiInd

private static CsvTestsDataLoader.MultiIndexTestDataset testDatasets(LogicalPlan parsed) {
var preAnalysis = new PreAnalyzer().preAnalyze(parsed);
if (preAnalysis.index() == null) {
if (preAnalysis.indexPattern() == null) {
// If the data set doesn't matter we'll just grab one we know works. Employees is fine.
return CsvTestsDataLoader.MultiIndexTestDataset.of(CSV_DATASET_MAP.get("employees"));
}

String indexName = preAnalysis.index().indexPattern();
String indexName = preAnalysis.indexPattern().indexPattern();
List<CsvTestsDataLoader.TestDataset> datasets = new ArrayList<>();
if (indexName.endsWith("*")) {
String indexPrefix = indexName.substring(0, indexName.length() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ EnrichPolicyResolver mockEnrichResolver() {
ActionListener<EnrichResolution> listener = (ActionListener<EnrichResolution>) arguments[arguments.length - 1];
listener.onResponse(new EnrichResolution());
return null;
}).when(enrichResolver).resolvePolicies(any(), any(), any());
}).when(enrichResolver).resolvePolicies(any(), any(), any(), any());
return enrichResolver;
}

Expand Down