[ASTERIXDB-3340][EXT] Use proper cluster locations based on partitioning scheme
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
When specifying the constraints (cluster locations) of
an external data source:
Non-cloud deployment (Dynamic partitioning):
use a constraints that is sorted on the node
names (sorted cluster locations) so that it matches the constraints used
when creating an internal dataset.
Cloud-deployment (Static partitioning):
use a constraints that is based on the partitioning map.
Change-Id: Ia830326f86712d5a0868979ed54afda00df53b78
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18143
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
index bd72a66..1eac011 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
@@ -54,7 +54,7 @@
}
@Override
- protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
+ protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) {
return constraint;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index b3889fd..52054d6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -80,7 +80,7 @@
}
@Override
- protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
+ protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) {
return storageLocations;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
index 430ba36..b4d1b03 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
@@ -63,7 +63,7 @@
}
@Override
- protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
- return csm.getSortedClusterLocations();
+ protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) {
+ return md.getDataPartitioningProvider().getClusterLocations();
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
index 67319e3..3bbaee2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
@@ -67,7 +67,7 @@
}
@Override
- protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
- return csm.getSortedClusterLocations();
+ protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) {
+ return md.getDataPartitioningProvider().getClusterLocations();
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index ffa6c87..a5d503b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -173,7 +173,7 @@
/**
* @return the constraint representing all the partitions of the cluster sorted by node name
*/
- AlgebricksAbsolutePartitionConstraint getSortedClusterLocations();
+ AlgebricksAbsolutePartitionConstraint getNodeSortedClusterLocations();
/**
* @param excludePendingRemoval true, if the desired set shouldn't have pending removal nodes
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
index e59d4e7..828715d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
@@ -18,5 +18,10 @@
*/
package org.apache.asterix.common.dataflow;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+
public interface IDataPartitioningProvider {
+
+ AlgebricksAbsolutePartitionConstraint getClusterLocations();
+
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index ac8859b..06662e5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -70,8 +70,8 @@
public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException {
this.configuration = configuration;
- this.partitionConstraint = ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager()
- .getSortedClusterLocations();
+ this.partitionConstraint = ((ICcApplicationContext) ctx.getApplicationContext()).getDataPartitioningProvider()
+ .getClusterLocations();
this.filterEvaluatorFactory = filterEvaluatorFactory;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index e92448c..f7638b4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -252,7 +252,7 @@
public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx,
AlgebricksAbsolutePartitionConstraint clusterLocations) {
if (clusterLocations == null) {
- return ((ICcApplicationContext) appCtx).getClusterStateManager().getSortedClusterLocations();
+ return ((ICcApplicationContext) appCtx).getDataPartitioningProvider().getClusterLocations();
}
return clusterLocations;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index f6aa347..91f7615 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -110,7 +110,7 @@
adapterFactory.setOutputType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
IClusterStateManager csm = metadataProvider.getApplicationContext().getClusterStateManager();
FunctionDataSourceFactory factory =
- new FunctionDataSourceFactory(createFunction(metadataProvider, getLocations(csm)));
+ new FunctionDataSourceFactory(createFunction(metadataProvider, getLocations(csm, metadataProvider)));
IDataParserFactory dataParserFactory = createDataParserFactory();
dataParserFactory.setRecordType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
dataParserFactory.configure(Collections.emptyMap());
@@ -126,8 +126,8 @@
protected abstract IDatasourceFunction createFunction(MetadataProvider metadataProvider,
AlgebricksAbsolutePartitionConstraint locations);
- protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
- String[] sortedLocations = csm.getSortedClusterLocations().getLocations();
+ protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) {
+ String[] sortedLocations = md.getDataPartitioningProvider().getClusterLocations().getLocations();
return new AlgebricksAbsolutePartitionConstraint(
Arrays.stream(sortedLocations).distinct().toArray(String[]::new));
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 107cfb2..8d8ca80 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1010,9 +1010,14 @@
}
public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+ //TODO(partitioning): should this be removed and getSortedClusterLocations() is used instead?
return appCtx.getClusterStateManager().getClusterLocations();
}
+ public DataPartitioningProvider getDataPartitioningProvider() {
+ return dataPartitioningProvider;
+ }
+
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
JobSpecification jobSpec, Dataset dataset, int[] ridIndexes, boolean retainInput,
IVariableTypeEnvironment typeEnv, IOperatorSchema opSchema, JobGenContext context,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
index 57392db..1a1c8ac 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -64,4 +65,9 @@
int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second));
return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap);
}
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+ return clusterStateManager.getNodeSortedClusterLocations();
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
index 7206a50..44141cb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataConstants;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
@@ -50,10 +51,10 @@
@Override
public PartitioningProperties getPartitioningProperties(String databaseName) {
- SplitComputeLocations dataverseSplits = getSplits(databaseName);
+ SplitComputeLocations databaseSplits = getSplits(databaseName);
StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap();
int[][] partitionsMap = partitionMap.getComputeToStorageMap(false);
- return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), dataverseSplits.getConstraints(),
+ return PartitioningProperties.of(databaseSplits.getSplitsProvider(), databaseSplits.getConstraints(),
partitionsMap);
}
@@ -127,4 +128,10 @@
new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
return new SplitComputeLocations(splitProvider, constraints);
}
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+ SplitComputeLocations locations = getSplits(MetadataConstants.DEFAULT_DATABASE);
+ return (AlgebricksAbsolutePartitionConstraint) locations.getConstraints();
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 0a10204..444d91b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -302,7 +302,7 @@
}
@Override
- public synchronized AlgebricksAbsolutePartitionConstraint getSortedClusterLocations() {
+ public synchronized AlgebricksAbsolutePartitionConstraint getNodeSortedClusterLocations() {
String[] clone = getClusterLocations().getLocations().clone();
Arrays.sort(clone);
return new AlgebricksAbsolutePartitionConstraint(clone);