[ASTERIXDB-3340][EXT] Use proper cluster locations based on partitioning scheme

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
When specifying the constraints (cluster locations) of
an external data source:

Non-cloud deployment (Dynamic partitioning):
use a constraints that is sorted on the node
names (sorted cluster locations) so that it matches the constraints used
when creating an internal dataset.

Cloud-deployment (Static partitioning):
use a constraints that is based on the partitioning map.

Change-Id: Ia830326f86712d5a0868979ed54afda00df53b78
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18143
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
index bd72a66..1eac011 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
@@ -54,7 +54,7 @@
     }
 
     @Override
-    protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
+    protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) {
         return constraint;
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index b3889fd..52054d6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -80,7 +80,7 @@
     }
 
     @Override
-    protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
+    protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) {
         return storageLocations;
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
index 430ba36..b4d1b03 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSAllTablesDataGeneratorDatasource.java
@@ -63,7 +63,7 @@
     }
 
     @Override
-    protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
-        return csm.getSortedClusterLocations();
+    protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) {
+        return md.getDataPartitioningProvider().getClusterLocations();
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
index 67319e3..3bbaee2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/TPCDSSingleTableDataGeneratorDatasource.java
@@ -67,7 +67,7 @@
     }
 
     @Override
-    protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
-        return csm.getSortedClusterLocations();
+    protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) {
+        return md.getDataPartitioningProvider().getClusterLocations();
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index ffa6c87..a5d503b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -173,7 +173,7 @@
     /**
      * @return the constraint representing all the partitions of the cluster sorted by node name
      */
-    AlgebricksAbsolutePartitionConstraint getSortedClusterLocations();
+    AlgebricksAbsolutePartitionConstraint getNodeSortedClusterLocations();
 
     /**
      * @param excludePendingRemoval true, if the desired set shouldn't have pending removal nodes
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
index e59d4e7..828715d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IDataPartitioningProvider.java
@@ -18,5 +18,10 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+
 public interface IDataPartitioningProvider {
+
+    AlgebricksAbsolutePartitionConstraint getClusterLocations();
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index ac8859b..06662e5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -70,8 +70,8 @@
     public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
             IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException {
         this.configuration = configuration;
-        this.partitionConstraint = ((ICcApplicationContext) ctx.getApplicationContext()).getClusterStateManager()
-                .getSortedClusterLocations();
+        this.partitionConstraint = ((ICcApplicationContext) ctx.getApplicationContext()).getDataPartitioningProvider()
+                .getClusterLocations();
         this.filterEvaluatorFactory = filterEvaluatorFactory;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index e92448c..f7638b4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -252,7 +252,7 @@
     public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(IApplicationContext appCtx,
             AlgebricksAbsolutePartitionConstraint clusterLocations) {
         if (clusterLocations == null) {
-            return ((ICcApplicationContext) appCtx).getClusterStateManager().getSortedClusterLocations();
+            return ((ICcApplicationContext) appCtx).getDataPartitioningProvider().getClusterLocations();
         }
         return clusterLocations;
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index f6aa347..91f7615 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -110,7 +110,7 @@
         adapterFactory.setOutputType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
         IClusterStateManager csm = metadataProvider.getApplicationContext().getClusterStateManager();
         FunctionDataSourceFactory factory =
-                new FunctionDataSourceFactory(createFunction(metadataProvider, getLocations(csm)));
+                new FunctionDataSourceFactory(createFunction(metadataProvider, getLocations(csm, metadataProvider)));
         IDataParserFactory dataParserFactory = createDataParserFactory();
         dataParserFactory.setRecordType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
         dataParserFactory.configure(Collections.emptyMap());
@@ -126,8 +126,8 @@
     protected abstract IDatasourceFunction createFunction(MetadataProvider metadataProvider,
             AlgebricksAbsolutePartitionConstraint locations);
 
-    protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm) {
-        String[] sortedLocations = csm.getSortedClusterLocations().getLocations();
+    protected AlgebricksAbsolutePartitionConstraint getLocations(IClusterStateManager csm, MetadataProvider md) {
+        String[] sortedLocations = md.getDataPartitioningProvider().getClusterLocations().getLocations();
         return new AlgebricksAbsolutePartitionConstraint(
                 Arrays.stream(sortedLocations).distinct().toArray(String[]::new));
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 107cfb2..8d8ca80 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1010,9 +1010,14 @@
     }
 
     public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+        //TODO(partitioning): should this be removed and getSortedClusterLocations() is used instead?
         return appCtx.getClusterStateManager().getClusterLocations();
     }
 
+    public DataPartitioningProvider getDataPartitioningProvider() {
+        return dataPartitioningProvider;
+    }
+
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
             JobSpecification jobSpec, Dataset dataset, int[] ridIndexes, boolean retainInput,
             IVariableTypeEnvironment typeEnv, IOperatorSchema opSchema, JobGenContext context,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
index 57392db..1a1c8ac 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -64,4 +65,9 @@
         int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second));
         return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap);
     }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+        return clusterStateManager.getNodeSortedClusterLocations();
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
index 7206a50..44141cb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataConstants;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
@@ -50,10 +51,10 @@
 
     @Override
     public PartitioningProperties getPartitioningProperties(String databaseName) {
-        SplitComputeLocations dataverseSplits = getSplits(databaseName);
+        SplitComputeLocations databaseSplits = getSplits(databaseName);
         StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap();
         int[][] partitionsMap = partitionMap.getComputeToStorageMap(false);
-        return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), dataverseSplits.getConstraints(),
+        return PartitioningProperties.of(databaseSplits.getSplitsProvider(), databaseSplits.getConstraints(),
                 partitionsMap);
     }
 
@@ -127,4 +128,10 @@
                 new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
         return new SplitComputeLocations(splitProvider, constraints);
     }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+        SplitComputeLocations locations = getSplits(MetadataConstants.DEFAULT_DATABASE);
+        return (AlgebricksAbsolutePartitionConstraint) locations.getConstraints();
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 0a10204..444d91b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -302,7 +302,7 @@
     }
 
     @Override
-    public synchronized AlgebricksAbsolutePartitionConstraint getSortedClusterLocations() {
+    public synchronized AlgebricksAbsolutePartitionConstraint getNodeSortedClusterLocations() {
         String[] clone = getClusterLocations().getLocations().clone();
         Arrays.sort(clone);
         return new AlgebricksAbsolutePartitionConstraint(clone);