[ASTERIXDB-3340][EXT] Use sorted cluster locations for external data sources
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
When specifying the constraints (cluster locations) of
an external data source, use a constraints that is sorted on the node
names (sorted cluster locations) so that it matches the constraints used
when creating an internal dataset. Otherwise, when an internal dataset
is used with an external dataset (e.g. UNION ALL), the query will fail
due to incorrect partition assignment.
Change-Id: I5c7753d53c377d0c5e286b3bfea6b2358abf6d66
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18099
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Ian Maxon <imaxon@uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
(cherry picked from commit 6308c3716b20e9cf91a9c641f92fe0004d67d150)
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18105
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
index 82303fc..430ba36 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
@@ -64,6 +64,6 @@
@Override
protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
- return new AlgebricksAbsolutePartitionConstraint(csm.getClusterLocations().getLocations());
+ return csm.getSortedClusterLocations();
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
index 458bff6..67319e3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
@@ -68,6 +68,6 @@
@Override
protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
- return new AlgebricksAbsolutePartitionConstraint(csm.getClusterLocations().getLocations());
+ return csm.getSortedClusterLocations();
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 76802d9..fa62392 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -172,6 +172,11 @@
AlgebricksAbsolutePartitionConstraint getClusterLocations();
/**
+ * @return the constraint representing all the partitions of the cluster sorted by node name
+ */
+ AlgebricksAbsolutePartitionConstraint getSortedClusterLocations();
+
+ /**
* @param excludePendingRemoval
* true, if the desired set shouldn't have pending removal nodes
* @return the set of participant nodes
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index eac4835..9760c55 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -70,8 +70,8 @@
public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
throws AlgebricksException {
this.configuration = configuration;
- this.partitionConstraint =
- ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager().getClusterLocations();
+ this.partitionConstraint = ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager()
+ .getSortedClusterLocations();
}
public static class PartitionWorkLoadBasedOnSize implements Serializable {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 3506216..f4edafc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -265,7 +265,7 @@
public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx,
AlgebricksAbsolutePartitionConstraint clusterLocations) {
if (clusterLocations == null) {
- return ((ICcApplicationContext) appCtx).getClusterStateManager().getClusterLocations();
+ return ((ICcApplicationContext) appCtx).getClusterStateManager().getSortedClusterLocations();
}
return clusterLocations;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 5c874d7..97c4c5d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -20,9 +20,7 @@
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.functions.FunctionSignature;
@@ -112,9 +110,9 @@
AlgebricksAbsolutePartitionConstraint locations);
protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
- String[] allPartitions = csm.getClusterLocations().getLocations();
- Set<String> ncs = new HashSet<>(Arrays.asList(allPartitions));
- return new AlgebricksAbsolutePartitionConstraint(ncs.toArray(new String[ncs.size()]));
+ String[] sortedLocations = csm.getSortedClusterLocations().getLocations();
+ return new AlgebricksAbsolutePartitionConstraint(
+ Arrays.stream(sortedLocations).distinct().toArray(String[]::new));
}
protected IDataParserFactory createDataParserFactory() {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index d3c87ff..1ff62a9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -289,6 +289,13 @@
return clusterPartitionConstraint;
}
+ @Override
+ public synchronized AlgebricksAbsolutePartitionConstraint getSortedClusterLocations() {
+ String[] clone = getClusterLocations().getLocations().clone();
+ Arrays.sort(clone);
+ return new AlgebricksAbsolutePartitionConstraint(clone);
+ }
+
private synchronized void resetClusterPartitionConstraint() {
ArrayList<String> clusterActiveLocations = new ArrayList<>();
for (ClusterPartition p : clusterPartitions.values()) {