Refactor Active Listeners

Change-Id: I260c8608329523f56dc54780d87d796f838505cf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1118
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index 30a2eb6..5ff02c7 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -198,4 +198,17 @@
             LOGGER.severe("No listener was found for the entity: " + activeJob.getEntityId());
         }
     }
+
+    public synchronized void unregisterListener(IActiveEntityEventsListener listener) throws HyracksDataException {
+        if (DEBUG) {
+            LOGGER.log(Level.WARNING,
+                    "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
+                            + listener.getEntityId());
+        }
+        IActiveEntityEventsListener registeredListener = entityEventListener.remove(listener.getEntityId());
+        if (registeredListener == null) {
+            throw new HyracksDataException(
+                    "Active Entity Listener " + listener.getEntityId() + " hasn't been registered");
+        }
+    }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 156576c..2dd9fe7 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -31,4 +31,6 @@
 
     public EntityId getEntityId();
 
+    public boolean isEntityConnectedToDataset(String dataverseName, String datasetName);
+
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
index 916355d..a574711 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
@@ -18,14 +18,13 @@
  */
 package org.apache.asterix.optimizer.rules;
 
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -45,7 +44,8 @@
 public class IntroduceMaterializationForInsertWithSelfScanRule implements IAlgebraicRewriteRule {
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 
@@ -105,11 +105,10 @@
             } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
                 DataSourceScanOperator dataSourceScanOp = (DataSourceScanOperator) descendantOp;
                 AqlDataSource ds = (AqlDataSource) dataSourceScanOp.getDataSource();
-                if (ds.getDatasourceType() != AqlDataSourceType.FEED
-                        && ds.getDatasourceType() != AqlDataSourceType.LOADABLE) {
-                    if (((DatasetDataSource) ds).getDataset().getDatasetName().compareTo(insertDatasetName) == 0) {
-                        return true;
-                    }
+                if ((ds.getDatasourceType() == AqlDataSourceType.INTERNAL_DATASET
+                        || ds.getDatasourceType() == AqlDataSourceType.EXTERNAL_DATASET)
+                        && ((DatasetDataSource) ds).getDataset().getDatasetName().compareTo(insertDatasetName) == 0) {
+                    return true;
                 }
             }
             sameDataset = checkIfInsertAndScanDatasetsSame(descendantOp, insertDatasetName);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index 4f5a848..815bd93 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.optimizer.rules;
 
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
-import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.declared.FeedDataSource;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -41,7 +41,8 @@
 public class IntroduceRandomPartitioningFeedComputationRule implements IAlgebraicRewriteRule {
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         ILogicalOperator op = opRef.getValue();
         if (!op.getOperatorTag().equals(LogicalOperatorTag.ASSIGN)) {
             return false;
@@ -54,7 +55,7 @@
 
         DataSourceScanOperator scanOp = (DataSourceScanOperator) opChild;
         AqlDataSource dataSource = (AqlDataSource) scanOp.getDataSource();
-        if (!dataSource.getDatasourceType().equals(AqlDataSourceType.FEED)) {
+        if (dataSource.getDatasourceType() != AqlDataSourceType.FEED) {
             return false;
         }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
index 1fa7730..06a6f37 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
@@ -85,7 +85,10 @@
             LogicalVariable dataVar = dataSource.getDataRecordVariable(allVars);
             LogicalVariable metaVar = dataSource.getMetaVariable(allVars);
             LogicalExpressionReferenceTransform currentTransformer = null;
-            if (dataSource.getDatasourceType() == AqlDataSourceType.FEED) {
+            // https://issues.apache.org/jira/browse/ASTERIXDB-1618
+            if (dataSource.getDatasourceType() != AqlDataSourceType.EXTERNAL_DATASET
+                    && dataSource.getDatasourceType() != AqlDataSourceType.INTERNAL_DATASET
+                    && dataSource.getDatasourceType() != AqlDataSourceType.LOADABLE) {
                 IMutationDataSource mds = (IMutationDataSource) dataSource;
                 if (mds.isChange()) {
                     transformers = new ArrayList<>();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java
index 079f61a..7af3c1a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java
@@ -24,7 +24,6 @@
 import java.util.List;
 
 import org.apache.asterix.algebra.base.AsterixOperatorAnnotations;
-import org.apache.asterix.om.util.ConstantExpressionUtil;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
@@ -40,6 +39,7 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.ConstantExpressionUtil;
 import org.apache.asterix.optimizer.base.AnalysisUtil;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -85,7 +85,7 @@
         }
         AssignOperator access = (AssignOperator) op;
         ILogicalExpression expr = getFirstExpr(access);
-        String finalAnnot = null;
+        String finalAnnot;
         if (AnalysisUtil.isAccessToFieldRecord(expr)) {
             finalAnnot = AsterixOperatorAnnotations.PUSHED_FIELD_ACCESS;
         } else if (AnalysisUtil.isRunnableAccessToFieldRecord(expr)) {
@@ -195,17 +195,17 @@
             propagateFieldAccessRec(opRef2, context, finalAnnot);
             return true;
         }
-        List<LogicalVariable> usedInAccess = new LinkedList<LogicalVariable>();
+        List<LogicalVariable> usedInAccess = new LinkedList<>();
         VariableUtilities.getUsedVariables(access, usedInAccess);
 
-        List<LogicalVariable> produced2 = new LinkedList<LogicalVariable>();
+        List<LogicalVariable> produced2 = new LinkedList<>();
         if (op2.getOperatorTag() == LogicalOperatorTag.GROUP) {
             VariableUtilities.getLiveVariables(op2, produced2);
         } else {
             VariableUtilities.getProducedVariables(op2, produced2);
         }
         boolean pushItDown = false;
-        List<LogicalVariable> inter = new ArrayList<LogicalVariable>(usedInAccess);
+        List<LogicalVariable> inter = new ArrayList<>(usedInAccess);
         if (inter.isEmpty()) { // ground value
             return false;
         }
@@ -214,7 +214,8 @@
             pushItDown = true;
         } else if (op2.getOperatorTag() == LogicalOperatorTag.GROUP) {
             GroupByOperator g = (GroupByOperator) op2;
-            List<Pair<LogicalVariable, LogicalVariable>> varMappings = new ArrayList<Pair<LogicalVariable, LogicalVariable>>();
+            List<Pair<LogicalVariable, LogicalVariable>> varMappings =
+                    new ArrayList<>();
             for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : g.getDecorList()) {
                 ILogicalExpression e = p.second.getValue();
                 if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
@@ -222,7 +223,7 @@
                     if (inter.contains(decorVar)) {
                         inter.remove(decorVar);
                         LogicalVariable v1 = ((VariableReferenceExpression) e).getVariableReference();
-                        varMappings.add(new Pair<LogicalVariable, LogicalVariable>(decorVar, v1));
+                        varMappings.add(new Pair<>(decorVar, v1));
                     }
                 }
             }
@@ -257,7 +258,7 @@
                 return true;
             } else {
                 for (Mutable<ILogicalOperator> inp : op2.getInputs()) {
-                    HashSet<LogicalVariable> v2 = new HashSet<LogicalVariable>();
+                    HashSet<LogicalVariable> v2 = new HashSet<>();
                     VariableUtilities.getLiveVariables(inp.getValue(), v2);
                     if (v2.containsAll(usedInAccess)) {
                         pushAccessDown(opRef, op2, inp, context, finalAnnot);
@@ -269,7 +270,7 @@
                 AbstractOperatorWithNestedPlans nestedOp = (AbstractOperatorWithNestedPlans) op2;
                 for (ILogicalPlan plan : nestedOp.getNestedPlans()) {
                     for (Mutable<ILogicalOperator> root : plan.getRoots()) {
-                        HashSet<LogicalVariable> v2 = new HashSet<LogicalVariable>();
+                        HashSet<LogicalVariable> v2 = new HashSet<>();
                         VariableUtilities.getLiveVariables(root.getValue(), v2);
                         if (v2.containsAll(usedInAccess)) {
                             pushAccessDown(opRef, op2, root, context, finalAnnot);
@@ -297,7 +298,7 @@
                         ILogicalExpression e1 = accessFun.getArguments().get(1).getValue();
                         if (e1.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
                             IDataSource<AqlSourceId> dataSource = (IDataSource<AqlSourceId>) scan.getDataSource();
-                            AqlDataSourceType dsType = ((AqlDataSource) dataSource).getDatasourceType();
+                            byte dsType = ((AqlDataSource) dataSource).getDatasourceType();
                             if (dsType == AqlDataSourceType.FEED || dsType == AqlDataSourceType.LOADABLE) {
                                 return false;
                             }
@@ -368,7 +369,7 @@
     // indirect recursivity with propagateFieldAccessRec
     private void pushAccessDown(Mutable<ILogicalOperator> fldAccessOpRef, ILogicalOperator op2,
             Mutable<ILogicalOperator> inputOfOp2, IOptimizationContext context, String finalAnnot)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         ILogicalOperator fieldAccessOp = fldAccessOpRef.getValue();
         fldAccessOpRef.setValue(op2);
         List<Mutable<ILogicalOperator>> faInpList = fieldAccessOp.getInputs();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
index b088607..25dcf35 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
@@ -18,10 +18,9 @@
  */
 package org.apache.asterix.optimizer.rules;
 
-import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -33,7 +32,8 @@
 public class RemoveSortInFeedIngestionRule implements IAlgebraicRewriteRule {
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 
@@ -51,7 +51,7 @@
         while (descendantOp != null) {
             if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
                 AqlDataSource dataSource = (AqlDataSource) ((DataSourceScanOperator) descendantOp).getDataSource();
-                if (dataSource.getDatasourceType().equals(AqlDataSourceType.FEED)) {
+                if (dataSource.getDatasourceType() == AqlDataSourceType.FEED) {
                     isSourceAFeed = true;
                 }
                 break;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
index f483d70..01476e7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
@@ -228,7 +228,7 @@
                     DataSourceScanOperator dataSourceScan = (DataSourceScanOperator) sourceOpRefs.get(i).getValue();
                     IDataSource<?> datasource = dataSourceScan.getDataSource();
                     if (datasource instanceof AqlDataSource) {
-                        AqlDataSourceType dsType = ((AqlDataSource) datasource).getDatasourceType();
+                        byte dsType = ((AqlDataSource) datasource).getDatasourceType();
                         if (dsType != AqlDataSourceType.INTERNAL_DATASET
                                 && dsType != AqlDataSourceType.EXTERNAL_DATASET) {
                             return false;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java
index 5b91f2f..015044c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java
@@ -37,6 +37,7 @@
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.app.result.ResultUtil;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.common.app.SessionConfig;
 import org.apache.asterix.common.config.GlobalConfig;
@@ -51,7 +52,6 @@
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
@@ -119,7 +119,7 @@
         }
     }
 
-    private enum ResultFields {
+    public enum ResultFields {
         REQUEST_ID("requestID"),
         SIGNATURE("signature"),
         TYPE("type"),
@@ -139,7 +139,7 @@
         }
     }
 
-    private enum ResultStatus {
+    public enum ResultStatus {
         SUCCESS("success"),
         TIMEOUT("timeout"),
         ERRORS("errors"),
@@ -331,7 +331,7 @@
     }
 
     private static void printError(PrintWriter pw, Throwable e) {
-        Throwable rootCause = ExceptionUtils.getRootCause(e);
+        Throwable rootCause = ResultUtil.getRootCause(e);
         if (rootCause == null) {
             rootCause = e;
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
index b5fd96e..979f7f4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
@@ -158,12 +158,12 @@
         return errorMessage.toString();
     }
 
-    private static Throwable getRootCause(Throwable cause) {
+    public static Throwable getRootCause(Throwable cause) {
         Throwable currentCause = cause;
         Throwable nextCause = cause.getCause();
         while (nextCause != null && nextCause != currentCause) {
             currentCause = nextCause;
-            nextCause = cause.getCause();
+            nextCause = nextCause.getCause();
         }
         return currentCause;
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6390b9d..727a1f9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -190,6 +190,7 @@
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -696,8 +697,7 @@
         StringBuilder builder = null;
         IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
         for (IActiveEntityEventsListener listener : listeners) {
-            if (listener instanceof FeedEventsListener
-                    && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) {
+            if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
                 if (builder == null) {
                     builder = new StringBuilder();
                 }
@@ -706,7 +706,7 @@
         }
         if (builder != null) {
             throw new AsterixException("Dataset " + dataverseName + "." + datasetName + " is currently being "
-                    + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported");
+                    + "fed into by the following active entities.\n" + builder.toString());
         }
     }
 
@@ -1411,22 +1411,11 @@
         Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
         if (ds.getDatasetType() == DatasetType.INTERNAL) {
             // prepare job spec(s) that would disconnect any active feeds involving the dataset.
-            IActiveEntityEventsListener[] feedConnections = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
-            for (IActiveEntityEventsListener conn : feedConnections) {
-                if (conn.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
-                        && ((FeedEventsListener) conn).isConnectedToDataset(datasetName)) {
-                    FeedConnectionId connectionId = new FeedConnectionId(conn.getEntityId(), datasetName);
-                    Pair<JobSpecification, Boolean> p = FeedOperations.buildDisconnectFeedJobSpec(metadataProvider,
-                            connectionId);
-                    disconnectJobList.put(connectionId, p);
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Disconnecting feed " + connectionId.getFeedId().getEntityName() + " from dataset "
-                                + datasetName + " as dataset is being dropped");
-                    }
-                    // prepare job to remove feed log storage
-                    jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(
-                            mdTxnCtx.getValue(), connectionId.getFeedId().getDataverse(),
-                            connectionId.getFeedId().getEntityName())));
+            IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+            for (IActiveEntityEventsListener listener : activeListeners) {
+                if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+                    throw new AsterixException(
+                            "Can't drop dataset since it is connected to active entity: " + listener.getEntityId());
                 }
             }
 
@@ -1547,8 +1536,7 @@
             IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             StringBuilder builder = null;
             for (IActiveEntityEventsListener listener : listeners) {
-                if (listener.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
-                        && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) {
+                if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
                     if (builder == null) {
                         builder = new StringBuilder();
                     }
@@ -1557,8 +1545,8 @@
             }
             if (builder != null) {
                 throw new AsterixException(
-                        "Dataset" + datasetName + " is currently being fed into by the following feeds " + "."
-                                + builder.toString() + "\nOperation not supported.");
+                        "Dataset" + datasetName + " is currently being fed into by the following active entities: "
+                                + builder.toString());
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
@@ -1709,7 +1697,7 @@
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
                     throw new IllegalStateException("System is inconsistent state: pending index("
-                    + dataverseName + "." + datasetName + "."
+                            + dataverseName + "." + datasetName + "."
                             + indexName + ") couldn't be removed from the metadata", e);
                 }
             }
@@ -2008,17 +1996,19 @@
         }
     }
 
-    protected void handleCreateFeedPolicyStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+    protected void handleCreateFeedPolicyStatement(AqlMetadataProvider metadataProvider, Statement stmt)
+            throws AlgebricksException, HyracksDataException {
         String dataverse;
         String policy;
         FeedPolicyEntity newPolicy = null;
+        MetadataTransactionContext mdTxnCtx = null;
         CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt;
         dataverse = getActiveDataverse(null);
         policy = cfps.getPolicyName();
         MetadataLockManager.INSTANCE.createFeedPolicyBegin(dataverse, dataverse + "." + policy);
         try {
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
             FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE
                     .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy);
             if (feedPolicy != null) {
@@ -2036,8 +2026,9 @@
                         .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse,
                                 cfps.getSourcePolicyName());
                 if (sourceFeedPolicy == null) {
-                    sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
-                            MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
+                    sourceFeedPolicy =
+                            MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
+                                    MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
                     if (sourceFeedPolicy == null) {
                         throw new AlgebricksException("Unknown policy " + cfps.getSourcePolicyName());
                     }
@@ -2061,9 +2052,9 @@
             }
             MetadataManager.INSTANCE.addFeedPolicy(mdTxnCtx, newPolicy);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
+        } catch (RemoteException | ACIDException e) {
             abort(e, e, mdTxnCtx);
-            throw e;
+            throw new HyracksDataException(e);
         } finally {
             MetadataLockManager.INSTANCE.createFeedPolicyEnd(dataverse, dataverse + "." + policy);
         }
@@ -2350,7 +2341,7 @@
         IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
         FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
                 .getActiveEntityListener(entityId);
-        if (listener == null || !listener.isConnectedToDataset(datasetName)) {
+        if (listener == null || !listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
             throw new AsterixException("Feed " + feed.getFeedId().getEntityName() + " is currently not connected to "
                     + cfs.getDatasetName().getValue() + ". Invalid operation!");
         }
@@ -2873,7 +2864,7 @@
             default:
                 throw new AlgebricksException(
                         "The system \"" + runStmt.getSystem() +
-                        "\" specified in your run statement is not supported.");
+                                "\" specified in your run statement is not supported.");
         }
 
     }
@@ -3107,12 +3098,21 @@
         return getActiveDataverseName(dataverse != null ? dataverse.getValue() : null);
     }
 
+    /**
+     * Abort the ongoing metadata transaction logging the error cause
+     *
+     * @param rootE
+     * @param parentE
+     * @param mdTxnCtx
+     */
     public static void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) {
         try {
             if (IS_DEBUG_MODE) {
                 LOGGER.log(Level.SEVERE, rootE.getMessage(), rootE);
             }
-            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            if (mdTxnCtx != null) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
         } catch (Exception e2) {
             parentE.addSuppressed(e2);
             throw new IllegalStateException(rootE);
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index 47290e7..4dc206c 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -43,10 +43,10 @@
 import org.apache.asterix.test.server.ITestServer;
 import org.apache.asterix.test.server.TestServerProvider;
 import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.asterix.testframework.context.TestFileContext;
 import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat;
-import org.apache.asterix.testframework.xml.TestGroup;
+import org.apache.asterix.testframework.context.TestFileContext;
 import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
+import org.apache.asterix.testframework.xml.TestGroup;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.http.HttpResponse;
@@ -249,7 +249,7 @@
         }
     }
 
-    private HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
+    protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
         HttpClient client = HttpClients.custom()
                 .setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE)
                 .build();
@@ -270,8 +270,8 @@
             String errorBody = EntityUtils.toString(httpResponse.getEntity());
             try {
                 JSONObject result = new JSONObject(errorBody);
-                String[] errors = {result.getJSONArray("error-code").getString(0), result.getString("summary"),
-                        result.getString("stacktrace")};
+                String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
+                        result.getString("stacktrace") };
                 GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
                 String exceptionMsg = "HTTP operation failed: " + errors[0]
                         + "\nSTATUS LINE: " + httpResponse.getStatusLine()
@@ -307,7 +307,7 @@
         return response.getEntity().getContent();
     }
 
-    private void setFormatParam(List<CompilationUnit.Parameter> params, OutputFormat fmt) {
+    protected void setFormatParam(List<CompilationUnit.Parameter> params, OutputFormat fmt) {
         boolean formatSet = false;
         for (CompilationUnit.Parameter param : params) {
             if ("format".equals(param.getName())) {
@@ -344,7 +344,7 @@
         return builder.build();
     }
 
-    private HttpUriRequest constructPostMethod(String statement, String endpoint, String stmtParam,
+    protected HttpUriRequest constructPostMethod(String statement, String endpoint, String stmtParam,
             boolean postStmtAsParam, List<CompilationUnit.Parameter> otherParams) {
         RequestBuilder builder = RequestBuilder.post(endpoint);
         if (postStmtAsParam) {
@@ -514,7 +514,6 @@
         executeTest(actualPath, testCaseCtx, pb, isDmlRecoveryTest, null);
     }
 
-
     public void executeTest(TestCaseContext testCaseCtx, TestFileContext ctx, String statement,
             boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
             List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath) throws Exception {
@@ -699,7 +698,7 @@
                 }
                 break;
             case "server": // (start <test server name> <port>
-                               // [<arg1>][<arg2>][<arg3>]...|stop (<port>|all))
+                           // [<arg1>][<arg2>][<arg3>]...|stop (<port>|all))
                 try {
                     lines = statement.trim().split("\n");
                     String[] command = lines[lines.length - 1].trim().split(" ");
@@ -747,7 +746,7 @@
                 }
                 break;
             case "lib": // expected format <dataverse-name> <library-name>
-                            // <library-directory>
+                        // <library-directory>
                         // TODO: make this case work well with entity names containing spaces by
                         // looking for \"
                 lines = statement.split("\n");
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index f4e67f3..c7bed8d9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -180,4 +180,8 @@
     public void setMetaType(ARecordType metaType) {
         this.metaType = metaType;
     }
+
+    public IExternalDataSourceFactory getDataSourceFactory() {
+        return dataSourceFactory;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index c40fed6..7f2191c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -611,7 +611,7 @@
         cInfo.setState(ActivityState.ACTIVE);
     }
 
-    public synchronized boolean isConnectedToDataset(String datasetName) {
+    private synchronized boolean isConnectedToDataset(String datasetName) {
         for (FeedConnectionId connection : connectJobInfos.keySet()) {
             if (connection.getDatasetName().equals(datasetName)) {
                 return true;
@@ -641,4 +641,9 @@
     public IFeedJoint getSourceFeedJoint() {
         return sourceFeedJoint;
     }
+
+    @Override
+    public boolean isEntityConnectedToDataset(String dataverseName, String datasetName) {
+        return isConnectedToDataset(datasetName);
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
index 1272d03..d406108 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
@@ -24,29 +24,19 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -56,19 +46,24 @@
     protected final AqlSourceId id;
     protected final IAType itemType;
     protected final IAType metaItemType;
-    protected final AqlDataSourceType datasourceType;
+    protected final byte datasourceType;
     protected IAType[] schemaTypes;
     protected INodeDomain domain;
     protected Map<String, Serializable> properties = new HashMap<>();
 
-    public enum AqlDataSourceType {
-        INTERNAL_DATASET,
-        EXTERNAL_DATASET,
-        FEED,
-        LOADABLE
+    public static class AqlDataSourceType {
+        // positive range is reserved for core datasource types
+        public static final byte INTERNAL_DATASET = 0x00;
+        public static final byte EXTERNAL_DATASET = 0x01;
+        public static final byte FEED = 0x02;
+        public static final byte LOADABLE = 0x03;
+
+        // Hide implicit public constructor
+        private AqlDataSourceType() {
+        }
     }
 
-    public AqlDataSource(AqlSourceId id, IAType itemType, IAType metaItemType, AqlDataSourceType datasourceType,
+    public AqlDataSource(AqlSourceId id, IAType itemType, IAType metaItemType, byte datasourceType,
             INodeDomain domain) throws AlgebricksException {
         this.id = id;
         this.itemType = itemType;
@@ -118,72 +113,7 @@
         }
     }
 
-    private static class AqlDataSourcePartitioningProvider implements IDataSourcePropertiesProvider {
-
-        private final AqlDataSource ds;
-
-        private final INodeDomain domain;
-
-        public AqlDataSourcePartitioningProvider(AqlDataSource dataSource, INodeDomain domain) {
-            this.ds = dataSource;
-            this.domain = domain;
-        }
-
-        @Override
-        public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) {
-            IPhysicalPropertiesVector propsVector = null;
-            IPartitioningProperty pp;
-            List<ILocalStructuralProperty> propsLocal;
-            int n;
-            switch (ds.getDatasourceType()) {
-                case LOADABLE:
-                case EXTERNAL_DATASET:
-                    pp = new RandomPartitioningProperty(domain);
-                    propsLocal = new ArrayList<ILocalStructuralProperty>();
-                    ds.computeLocalStructuralProperties(propsLocal, scanVariables);
-                    propsVector = new StructuralPropertiesVector(pp, propsLocal);
-                    break;
-
-                case FEED:
-                    n = scanVariables.size();
-                    if (n < 2) {
-                        pp = new RandomPartitioningProperty(domain);
-                    } else {
-                        Set<LogicalVariable> pvars = new ListSet<LogicalVariable>();
-                        pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
-                        pp = new UnorderedPartitionedProperty(pvars, domain);
-                    }
-                    propsLocal = new ArrayList<ILocalStructuralProperty>();
-                    propsVector = new StructuralPropertiesVector(pp, propsLocal);
-                    break;
-
-                case INTERNAL_DATASET:
-                    n = scanVariables.size();
-                    Set<LogicalVariable> pvars = new ListSet<LogicalVariable>();
-                    if (n < 2) {
-                        pp = new RandomPartitioningProperty(domain);
-                    } else {
-                        pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
-                        pp = new UnorderedPartitionedProperty(pvars, domain);
-                    }
-                    propsLocal = new ArrayList<ILocalStructuralProperty>();
-                    List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
-                    for (LogicalVariable pkVar : pvars) {
-                        orderColumns.add(new OrderColumn(pkVar, OrderKind.ASC));
-                    }
-                    propsLocal.add(new LocalOrderProperty(orderColumns));
-                    propsVector = new StructuralPropertiesVector(pp, propsLocal);
-                    break;
-
-                default:
-                    throw new IllegalArgumentException();
-            }
-            return propsVector;
-        }
-
-    }
-
-    public AqlDataSourceType getDatasourceType() {
+    public byte getDatasourceType() {
         return datasourceType;
     }
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSourcePartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSourcePartitioningProvider.java
new file mode 100644
index 0000000..14a6e68
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSourcePartitioningProvider.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+
+public class AqlDataSourcePartitioningProvider implements IDataSourcePropertiesProvider {
+
+    private final AqlDataSource ds;
+    private final INodeDomain domain;
+
+    public AqlDataSourcePartitioningProvider(AqlDataSource dataSource, INodeDomain domain) {
+        this.ds = dataSource;
+        this.domain = domain;
+    }
+
+    @Override
+    public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) {
+        IPhysicalPropertiesVector propsVector;
+        IPartitioningProperty pp;
+        List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
+        switch (ds.getDatasourceType()) {
+            case AqlDataSourceType.LOADABLE:
+            case AqlDataSourceType.EXTERNAL_DATASET:
+                pp = new RandomPartitioningProperty(domain);
+                ds.computeLocalStructuralProperties(propsLocal, scanVariables);
+                break;
+            case AqlDataSourceType.FEED:
+                pp = getFeedPartitioningProperty(ds, domain, scanVariables);
+                break;
+            case AqlDataSourceType.INTERNAL_DATASET:
+                Set<LogicalVariable> pvars = new ListSet<>();
+                pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars);
+                propsLocal.add(new LocalOrderProperty(getOrderColumns(pvars)));
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }
+        propsVector = new StructuralPropertiesVector(pp, propsLocal);
+        return propsVector;
+    }
+
+    private static List<OrderColumn> getOrderColumns(Set<LogicalVariable> pvars) {
+        List<OrderColumn> orderColumns = new ArrayList<>();
+        for (LogicalVariable pkVar : pvars) {
+            orderColumns.add(new OrderColumn(pkVar, OrderKind.ASC));
+        }
+        return orderColumns;
+    }
+
+    private static IPartitioningProperty getInternalDatasetPartitioningProperty(AqlDataSource ds, INodeDomain domain,
+            List<LogicalVariable> scanVariables, Set<LogicalVariable> pvars) {
+        IPartitioningProperty pp;
+        if (scanVariables.size() < 2) {
+            pp = new RandomPartitioningProperty(domain);
+        } else {
+            pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
+            pp = new UnorderedPartitionedProperty(pvars, domain);
+        }
+        return pp;
+    }
+
+    public static IPartitioningProperty getFeedPartitioningProperty(AqlDataSource ds, INodeDomain domain,
+            List<LogicalVariable> scanVariables) {
+        IPartitioningProperty pp;
+        if (scanVariables.size() < 2) {
+            pp = new RandomPartitioningProperty(domain);
+        } else {
+            Set<LogicalVariable> pvars = new ListSet<>();
+            pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
+            pp = new UnorderedPartitionedProperty(pvars, domain);
+        }
+        return pp;
+    }
+
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index da54e32..72360b6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -481,8 +481,7 @@
             }
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
-                    computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
                     itemType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] btreeFields = null;
@@ -497,10 +496,10 @@
                 }
                 Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
                         getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
-                        secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(),
-                        secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType,
-                        dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators,
-                        secondaryIndex.getKeyFieldSourceIndicators(), metaType);
+                                secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(),
+                                secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType,
+                                dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators,
+                                secondaryIndex.getKeyFieldSourceIndicators(), metaType);
                 comparatorFactories = comparatorFactoriesAndTypeTraits.first;
                 typeTraits = comparatorFactoriesAndTypeTraits.second;
                 if (filterTypeTraits != null) {
@@ -581,12 +580,12 @@
                 int[] buddyBreeFields = new int[] { numSecondaryKeys };
                 ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory =
                         new ExternalBTreeWithBuddyDataflowHelperFactory(
-                        compactionInfo.first, compactionInfo.second,
-                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
-                        getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
-                        ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
+                                compactionInfo.first, compactionInfo.second,
+                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+                                getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
+                                ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
                 btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
                         rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
                         highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
@@ -703,7 +702,7 @@
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
                     splitProviderAndPartitionConstraintsForDataset(
-                    dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
+                            dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
             ARecordType metaType = null;
             if (dataset.hasMetaPart()) {
                 metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
@@ -718,8 +717,7 @@
             }
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
-                    computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] rtreeFields = null;
@@ -864,33 +862,13 @@
         IAType itemType = findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         IAType metaItemType = findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
         INodeDomain domain = findNodeDomain(dataset.getNodeGroupName());
-        AqlDataSourceType datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL)
+        byte datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL)
                 ? AqlDataSourceType.EXTERNAL_DATASET : AqlDataSourceType.INTERNAL_DATASET;
         return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType,
                 dataset.getDatasetDetails(), domain);
     }
 
     @Override
-    public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
-        boolean result = false;
-        switch (((AqlDataSource) dataSource).getDatasourceType()) {
-            case INTERNAL_DATASET:
-            case EXTERNAL_DATASET:
-                result = ((DatasetDataSource) dataSource).getDataset().getDatasetType() == DatasetType.EXTERNAL;
-                break;
-            case FEED:
-                result = true;
-                break;
-            case LOADABLE:
-                result = true;
-                break;
-            default:
-                break;
-        }
-        return result;
-    }
-
-    @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
             IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
             LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
@@ -946,14 +924,13 @@
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(
-                    dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+                            dataSource.getId().getDataverseName(), datasetName, indexName, temp);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
-                    computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
                     itemType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = DatasetUtils.createFilterFields(dataset);
             int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
@@ -1037,7 +1014,7 @@
                     itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(
-                    dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+                            dataSource.getId().getDataverseName(), datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1359,7 +1336,7 @@
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(
-                    dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
+                            dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
 
             // Generate Output Record format
             ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
@@ -1491,8 +1468,7 @@
                     dataset.getDatasetName(), indexName);
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
-                    computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] btreeFields = null;
@@ -1529,7 +1505,7 @@
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(
-                    dataverseName, datasetName, indexName, temp);
+                            dataverseName, datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1734,7 +1710,7 @@
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(
-                    dataverseName, datasetName, indexName, temp);
+                            dataverseName, datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1869,15 +1845,14 @@
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(
-                    dataverseName, datasetName, indexName, temp);
+                            dataverseName, datasetName, indexName, temp);
             int[] btreeFields = new int[primaryComparatorFactories.length];
             for (int k = 0; k < btreeFields.length; k++) {
                 btreeFields[k] = k + numSecondaryKeys;
             }
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
-                    computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] rtreeFields = null;
@@ -2216,7 +2191,7 @@
                     itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(
-                    dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+                            dataSource.getId().getDataverseName(), datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2227,8 +2202,7 @@
             }
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
-                    computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
                     itemType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = DatasetUtils.createFilterFields(dataset);
             int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
@@ -2512,8 +2486,7 @@
                     secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
-                    computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
 
             int[] filterFields = null;
@@ -2539,7 +2512,7 @@
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(
-                    dataverseName, datasetName, indexName, temp);
+                            dataverseName, datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2692,15 +2665,14 @@
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(
-                    dataverseName, datasetName, indexName, temp);
+                            dataverseName, datasetName, indexName, temp);
             int[] btreeFields = new int[primaryComparatorFactories.length];
             for (int k = 0; k < btreeFields.length; k++) {
                 btreeFields[k] = k + numSecondaryKeys;
             }
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
-                    computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] rtreeFields = null;
@@ -2827,8 +2799,7 @@
                     dataset.getDatasetName(), indexName);
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
-                    computeFilterBinaryComparatorFactories(dataset,
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
                     recType, context.getBinaryComparatorFactoryProvider());
             int[] filterFields = null;
             int[] btreeFields = null;
@@ -2849,15 +2820,15 @@
                 Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
                         secondaryKeyNames.get(i), recType);
                 IAType keyType = keyPairType.first;
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.
-                        getBinaryComparatorFactory(keyType, true);
+                comparatorFactories[i] =
+                        AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
                 typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
             }
             List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
             for (List<String> partitioningKey : partitioningKeys) {
                 IAType keyType = recType.getSubFieldType(partitioningKey);
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.
-                        getBinaryComparatorFactory(keyType, true);
+                comparatorFactories[i] =
+                        AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
                 typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
                 ++i;
             }
@@ -2865,7 +2836,7 @@
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
                     splitProviderAndPartitionConstraintsForDataset(
-                    dataverseName, datasetName, indexName, temp);
+                            dataverseName, datasetName, indexName, temp);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 5a601bc..9729c77 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -20,6 +20,7 @@
 
 import java.util.List;
 
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataManager;
@@ -48,7 +49,7 @@
     private Dataset dataset;
 
     public DatasetDataSource(AqlSourceId id, Dataset dataset, IAType itemType, IAType metaItemType,
-            AqlDataSourceType datasourceType, IDatasetDetails datasetDetails, INodeDomain datasetDomain)
+            byte datasourceType, IDatasetDetails datasetDetails, INodeDomain datasetDomain)
             throws AlgebricksException {
         super(id, itemType, metaItemType, datasourceType, datasetDomain);
         this.dataset = dataset;
@@ -141,4 +142,9 @@
         }
     }
 
+    @Override
+    public boolean isScanAccessPathALeaf() {
+        return dataset.getDatasetType() == DatasetType.EXTERNAL;
+    }
+
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 0a81f03..d74bf9f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -204,4 +204,9 @@
             throw new AlgebricksException(e);
         }
     }
+
+    @Override
+    public boolean isScanAccessPathALeaf() {
+        return true;
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index 2ffaded..200a5a9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -144,4 +144,9 @@
         RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
         return aqlMetadataProvider.buildLoadableDatasetScan(jobSpec, adapterFactory, rDesc);
     }
+
+    @Override
+    public boolean isScanAccessPathALeaf() {
+        return true;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java
index 300033f..63ce5fa 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java
@@ -31,4 +31,7 @@
     public IDataSourcePropertiesProvider getPropertiesProvider();
 
     public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList);
+
+    // https://issues.apache.org/jira/browse/ASTERIXDB-1619
+    public boolean isScanAccessPathALeaf();
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 8466ef9..68710a5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -48,13 +48,11 @@
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
             List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
-                    throws AlgebricksException;
-
-    public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource);
+            throws AlgebricksException;
 
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
-                    throws AlgebricksException;
+            throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
@@ -63,7 +61,7 @@
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
             List<LogicalVariable> additionalNonKeyFields, JobGenContext context, JobSpecification jobSpec)
-                    throws AlgebricksException;
+            throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
diff --git a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java
index d395f3c..5b60022 100644
--- a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java
+++ b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java
@@ -77,4 +77,9 @@
     @Override
     public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) {
     }
+
+    @Override
+    public boolean isScanAccessPathALeaf() {
+        return true;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index f586af7..7bcb1d6 100644
--- a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -83,7 +83,7 @@
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
             List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         PigletFileDataSource ds = (PigletFileDataSource) dataSource;
 
         FileSplit[] fileSplits = ds.getFileSplits();
@@ -133,14 +133,9 @@
     }
 
     @Override
-    public boolean scannerOperatorIsLeaf(IDataSource<String> dataSource) {
-        return true;
-    }
-
-    @Override
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         PigletFileDataSink ds = (PigletFileDataSink) sink;
         FileSplit[] fileSplits = ds.getFileSplits();
         String[] locations = new String[fileSplits.length];
@@ -192,7 +187,7 @@
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 8701851..ddfb331 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -37,7 +37,6 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -269,8 +268,7 @@
                     DataSourceScanOperator scan = (DataSourceScanOperator) op;
                     IDataSource dataSource = scan.getDataSource();
                     DataSourceScanPOperator dss = new DataSourceScanPOperator(dataSource);
-                    IMetadataProvider mp = context.getMetadataProvider();
-                    if (mp.scannerOperatorIsLeaf(dataSource)) {
+                    if (dataSource.isScanAccessPathALeaf()) {
                         dss.disableJobGenBelowMe();
                     }
                     op.setPhysicalOperator(dss);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 4cddef1..1272562 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -34,10 +34,7 @@
 import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import org.apache.hyracks.control.cc.work.TriggerNCWork;
-import org.apache.hyracks.control.common.controllers.IniUtils;
-import org.ini4j.Ini;
-import org.xml.sax.InputSource;
+
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
@@ -86,10 +83,12 @@
 import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
 import org.apache.hyracks.control.cc.work.TaskCompleteWork;
 import org.apache.hyracks.control.cc.work.TaskFailureWork;
+import org.apache.hyracks.control.cc.work.TriggerNCWork;
 import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
 import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.IniUtils;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function;
@@ -105,6 +104,7 @@
 import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.ini4j.Ini;
 import org.xml.sax.InputSource;
 
 public class ClusterControllerService implements IControllerService {
@@ -351,6 +351,7 @@
     public Map<String, NodeControllerState> getNodeMap() {
         return nodeRegistry;
     }
+
     public CCConfig getConfig() {
         return ccConfig;
     }
@@ -406,21 +407,24 @@
                 }
 
                 case GET_JOB_STATUS: {
-                    HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
+                    HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+                            (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
                     workQueue.schedule(new GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(),
                             new IPCResponder<JobStatus>(handle, mid)));
                     return;
                 }
 
                 case GET_JOB_INFO: {
-                    HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
+                    HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
+                            (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
                     workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
                             new IPCResponder<JobInfo>(handle, mid)));
                     return;
                 }
 
                 case START_JOB: {
-                    HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+                    HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                            (HyracksClientInterfaceFunctions.StartJobFunction) fn;
                     JobId jobId = createJobId();
                     workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
                             sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
@@ -434,14 +438,16 @@
                 }
 
                 case GET_DATASET_RESULT_STATUS: {
-                    HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+                    HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
+                            (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
                     workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(),
                             gdrlf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
                     return;
                 }
 
                 case GET_DATASET_RESULT_LOCATIONS: {
-                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
+                            (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
                     workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this,
                             gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
                             new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
@@ -449,7 +455,8 @@
                 }
 
                 case WAIT_FOR_COMPLETION: {
-                    HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+                    HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
+                            (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
                     workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
                             new IPCResponder<Object>(handle, mid)));
                     return;
@@ -471,14 +478,16 @@
                 }
 
                 case CLI_DEPLOY_BINARY: {
-                    HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
+                    HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+                            (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
                     workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(),
                             dbf.getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid)));
                     return;
                 }
 
                 case CLI_UNDEPLOY_BINARY: {
-                    HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
+                    HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
+                            (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
                     workQueue.schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(),
                             new IPCResponder<DeploymentId>(handle, mid)));
                     return;
@@ -556,21 +565,24 @@
                 }
 
                 case REGISTER_PARTITION_PROVIDER: {
-                    CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn;
+                    CCNCFunctions.RegisterPartitionProviderFunction rppf =
+                            (CCNCFunctions.RegisterPartitionProviderFunction) fn;
                     workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this,
                             rppf.getPartitionDescriptor()));
                     return;
                 }
 
                 case REGISTER_PARTITION_REQUEST: {
-                    CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn;
+                    CCNCFunctions.RegisterPartitionRequestFunction rprf =
+                            (CCNCFunctions.RegisterPartitionRequestFunction) fn;
                     workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this,
                             rprf.getPartitionRequest()));
                     return;
                 }
 
                 case REGISTER_RESULT_PARTITION_LOCATION: {
-                    CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
+                    CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
+                            (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
                     workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this,
                             rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
                             rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
@@ -578,21 +590,24 @@
                 }
 
                 case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
-                    CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
+                    CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf =
+                            (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
                     workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
                             rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
                     return;
                 }
 
                 case REPORT_RESULT_PARTITION_FAILURE: {
-                    CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
+                    CCNCFunctions.ReportResultPartitionFailureFunction rrplf =
+                            (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
                     workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this,
                             rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
                     return;
                 }
 
                 case SEND_APPLICATION_MESSAGE: {
-                    CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                    CCNCFunctions.SendApplicationMessageFunction rsf =
+                            (CCNCFunctions.SendApplicationMessageFunction) fn;
                     workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(),
                             rsf.getDeploymentId(), rsf.getNodeId()));
                     return;
@@ -609,7 +624,6 @@
 
                                 @Override
                                 public void setException(Exception e) {
-
                                 }
                             }));
                     return;