[NO ISSUE][COMP] Reduce memory requirements for metadata only queries

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Queries over metadata datasets should run with minimal memory requirements
- Introduce AsterixOptimizationContext which extends AlgebricksOptimizationContext
  and contains information required by Asterix optimizer rules
- Implement equals()/hashCode() for DataSourceId
- Add "plans" testcase type

Change-Id: I0d8902f88f61953e839e2fa0b06a77c324edbc57
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3385
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml
index ee0bb5e..180e0b5 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -254,5 +254,9 @@
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-http</artifactId>
     </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AsterixOptimizationContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AsterixOptimizationContext.java
new file mode 100644
index 0000000..82ab296
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AsterixOptimizationContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.optimizer.base;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IMissableTypeComputer;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+public final class AsterixOptimizationContext extends AlgebricksOptimizationContext {
+
+    private final Int2ObjectMap<Set<DataSourceId>> dataSourceMap = new Int2ObjectOpenHashMap<>();
+
+    public AsterixOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
+            IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
+            IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer nullableTypeComputer,
+            IConflictingTypeResolver conflictingTypeResovler, PhysicalOptimizationConfig physicalOptimizationConfig,
+            AlgebricksPartitionConstraint clusterLocations, LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
+        super(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
+                nullableTypeComputer, conflictingTypeResovler, physicalOptimizationConfig, clusterLocations,
+                prettyPrintVisitor);
+    }
+
+    public void addDataSource(DataSource dataSource) {
+        byte type = dataSource.getDatasourceType();
+        Set<DataSourceId> set = dataSourceMap.get(type);
+        if (set == null) {
+            set = new HashSet<>();
+            dataSourceMap.put(type, set);
+        }
+        set.add(dataSource.getId());
+    }
+
+    public Int2ObjectMap<Set<DataSourceId>> getDataSourceMap() {
+        return dataSourceMap;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 531cbf3..e4c35e4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -42,6 +42,7 @@
 import org.apache.asterix.optimizer.rules.ExtractOrderExpressionsRule;
 import org.apache.asterix.optimizer.rules.ExtractWindowExpressionsRule;
 import org.apache.asterix.optimizer.rules.FeedScanCollectionToUnnest;
+import org.apache.asterix.optimizer.rules.FindDataSourcesRule;
 import org.apache.asterix.optimizer.rules.FixReplicateOperatorOutputsRule;
 import org.apache.asterix.optimizer.rules.FullTextContainsParameterCheckRule;
 import org.apache.asterix.optimizer.rules.FuzzyEqRule;
@@ -77,6 +78,7 @@
 import org.apache.asterix.optimizer.rules.RemoveSortInFeedIngestionRule;
 import org.apache.asterix.optimizer.rules.RemoveUnusedOneToOneEquiJoinRule;
 import org.apache.asterix.optimizer.rules.RewriteDistinctAggregateRule;
+import org.apache.asterix.optimizer.rules.SetAsterixMemoryRequirementsRule;
 import org.apache.asterix.optimizer.rules.SetAsterixPhysicalOperatorsRule;
 import org.apache.asterix.optimizer.rules.SetClosedRecordConstructorsRule;
 import org.apache.asterix.optimizer.rules.SetupCommitExtensionOpRule;
@@ -134,7 +136,6 @@
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveUnusedAssignAndAggregateRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ReuseWindowAggregateRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SetExecutionModeRule;
-import org.apache.hyracks.algebricks.rewriter.rules.SetMemoryRequirementsRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SimpleUnnestToProductRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SwitchInnerJoinBranchRule;
 import org.apache.hyracks.algebricks.rewriter.rules.subplan.EliminateIsomorphicSubplanRule;
@@ -352,6 +353,7 @@
     public static final List<IAlgebraicRewriteRule> buildDataExchangeRuleCollection() {
         List<IAlgebraicRewriteRule> dataExchange = new LinkedList<>();
         dataExchange.add(new SetExecutionModeRule());
+        dataExchange.add(new FindDataSourcesRule());
         return dataExchange;
     }
 
@@ -361,7 +363,7 @@
         //Turned off the following rule for now not to change OptimizerTest results.
         physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
-        physicalRewritesAllLevels.add(new SetMemoryRequirementsRule());
+        physicalRewritesAllLevels.add(new SetAsterixMemoryRequirementsRule());
         // must run after SetMemoryRequirementsRule
         physicalRewritesAllLevels.add(new HybridToInMemoryHashJoinRule());
         physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());
@@ -412,7 +414,7 @@
         prepareForJobGenRewrites.add(new ReinferAllTypesRule());
         prepareForJobGenRewrites.add(new PushGroupByIntoSortRule());
         prepareForJobGenRewrites.add(new SetExecutionModeRule());
-        prepareForJobGenRewrites.add(new SetMemoryRequirementsRule());
+        prepareForJobGenRewrites.add(new SetAsterixMemoryRequirementsRule());
         prepareForJobGenRewrites.add(new SweepIllegalNonfunctionalFunctions());
         prepareForJobGenRewrites.add(new FixReplicateOperatorOutputsRule());
         return prepareForJobGenRewrites;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/FindDataSourcesRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/FindDataSourcesRule.java
new file mode 100644
index 0000000..7a1f09a
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/FindDataSourcesRule.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.optimizer.rules;
+
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.optimizer.base.AsterixOptimizationContext;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Finds data sources used by {@link LogicalOperatorTag#DATASOURCESCAN DATASOURCESCAN} operators and
+ * adds them to the optimization context to be used later by {@link SetAsterixMemoryRequirementsRule}
+ */
+public final class FindDataSourcesRule implements IAlgebraicRewriteRule {
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        ILogicalOperator op = opRef.getValue();
+        if (op.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+            DataSourceScanOperator scanOp = (DataSourceScanOperator) op;
+            DataSource dataSource = (DataSource) scanOp.getDataSource();
+            AsterixOptimizationContext ctx = (AsterixOptimizationContext) context;
+            ctx.addDataSource(dataSource);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixMemoryRequirementsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixMemoryRequirementsRule.java
new file mode 100644
index 0000000..a850cf4
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixMemoryRequirementsRule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.optimizer.rules;
+
+import java.util.Set;
+
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.utils.MetadataConstants;
+import org.apache.asterix.optimizer.base.AsterixOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.algebricks.rewriter.rules.SetMemoryRequirementsRule;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+
+/**
+ * This rule extends {@link SetMemoryRequirementsRule} and modifies its functionality as follows:
+ * <ul>
+ * <li>It skips memory requirements configuration if the query operates only on metadata datasets.
+ * In this case operators will retain their default (minimal) memory requirements.
+ * </li>
+ * </ul>
+ */
+public final class SetAsterixMemoryRequirementsRule extends SetMemoryRequirementsRule {
+
+    @Override
+    protected ILogicalOperatorVisitor<Void, Void> createMemoryRequirementsConfigurator(IOptimizationContext context) {
+        return forceMinMemoryBudget((AsterixOptimizationContext) context) ? null
+                : super.createMemoryRequirementsConfigurator(context);
+    }
+
+    private boolean forceMinMemoryBudget(AsterixOptimizationContext context) {
+        Int2ObjectMap<Set<DataSourceId>> dataSourceMap = context.getDataSourceMap();
+        if (dataSourceMap.size() == 1) {
+            Set<DataSourceId> dataSources = dataSourceMap.get(DataSource.Type.INTERNAL_DATASET);
+            return dataSources != null && dataSources.stream()
+                    .allMatch(dsId -> MetadataConstants.METADATA_DATAVERSE_NAME.equals(dsId.getDataverseName()));
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index d155756..81151d0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -68,6 +68,7 @@
 import org.apache.asterix.lang.sqlpp.rewrites.SqlppQueryRewriter;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.optimizer.base.AsterixOptimizationContext;
 import org.apache.asterix.optimizer.base.FuzzyUtils;
 import org.apache.asterix.optimizer.rules.am.AbstractIntroduceAccessMethodRule;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
@@ -98,7 +99,6 @@
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitorJson;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
-import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -167,9 +167,10 @@
                 IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer missableTypeComputer,
                 IConflictingTypeResolver conflictingTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig,
                 AlgebricksPartitionConstraint clusterLocations) {
-            return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
+            LogicalOperatorPrettyPrintVisitor prettyPrintVisitor = new LogicalOperatorPrettyPrintVisitor();
+            return new AsterixOptimizationContext(varCounter, expressionEvalSizeComputer,
                     mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer,
-                    conflictingTypeResolver, physicalOptimizationConfig, clusterLocations);
+                    conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, prettyPrintVisitor);
         }
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index 0819443..864a339 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -90,6 +90,10 @@
         return extract(resultStream, EnumSet.of(ResultField.METRICS), resultCharset);
     }
 
+    public static InputStream extractPlans(InputStream resultStream, Charset resultCharset) throws Exception {
+        return extract(resultStream, EnumSet.of(ResultField.PLANS), resultCharset);
+    }
+
     public static String extractHandle(InputStream resultStream, Charset responseCharset) throws Exception {
         String result = IOUtils.toString(resultStream, responseCharset);
         ObjectNode resultJson = OBJECT_MAPPER.readValue(result, ObjectNode.class);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 217b065..5d5abdf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -158,6 +158,7 @@
     public static final String DELIVERY_IMMEDIATE = "immediate";
     public static final String DIAGNOSE = "diagnose";
     private static final String METRICS_QUERY_TYPE = "metrics";
+    private static final String PLANS_QUERY_TYPE = "plans";
 
     private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
     private static Map<String, InetSocketAddress> ncEndPoints;
@@ -940,6 +941,7 @@
             case "parse":
             case "deferred":
             case "metrics":
+            case "plans":
                 // isDmlRecoveryTest: insert Crash and Recovery
                 if (isDmlRecoveryTest) {
                     executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
@@ -1273,6 +1275,9 @@
                 case METRICS_QUERY_TYPE:
                     resultStream = ResultExtractor.extractMetrics(resultStream, responseCharset);
                     break;
+                case PLANS_QUERY_TYPE:
+                    resultStream = ResultExtractor.extractPlans(resultStream, responseCharset);
+                    break;
                 default:
                     resultStream = ResultExtractor.extract(resultStream, responseCharset);
                     break;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/metadata_only_01/metadata_only_01.1.plans.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/metadata_only_01/metadata_only_01.1.plans.sqlpp
new file mode 100644
index 0000000..6862b8f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/metadata_only_01/metadata_only_01.1.plans.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- param job:string=true
+
+SET `compiler.parallelism` "1";
+
+from Metadata.`Dataset` ds
+join Metadata.`Index` idx
+on ds.DataverseName = idx.DataverseName and ds.DatasetName = idx.DatasetName
+where contains(ds.DataverseName, "Metadata")
+  and ds.DatasetName in ["Dataverse", "Dataset", "Index"]
+  and idx.IsPrimary
+group by ds.DataverseName
+select value count(*)
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/metadata_only_01/metadata_only_01.1.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/metadata_only_01/metadata_only_01.1.regex
new file mode 100644
index 0000000..94ff0a4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/metadata_only_01/metadata_only_01.1.regex
@@ -0,0 +1 @@
+/memory\D+917504/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 9a561fc..f580896 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -5792,6 +5792,11 @@
         <output-dir compare="Text">dump_index</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="misc">
+      <compilation-unit name="metadata_only_01">
+        <output-dir compare="Text">metadata_only_01</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="index">
     <test-group name="index/validations">
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java
index 0c3f942..c96fcd1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DataSourceId.java
@@ -19,7 +19,9 @@
 
 package org.apache.asterix.metadata.declared;
 
-public class DataSourceId {
+import java.util.Objects;
+
+public final class DataSourceId {
 
     private String dataverseName;
     private String datasourceName;
@@ -41,4 +43,19 @@
     public String getDatasourceName() {
         return datasourceName;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        DataSourceId that = (DataSourceId) o;
+        return Objects.equals(dataverseName, that.dataverseName) && Objects.equals(datasourceName, that.datasourceName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(dataverseName, datasourceName);
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 58e95bb..25a4c7a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -90,16 +90,6 @@
 
     public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
-            IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer missableTypeComputer,
-            IConflictingTypeResolver conflictingTypeResovler, PhysicalOptimizationConfig physicalOptimizationConfig,
-            AlgebricksPartitionConstraint clusterLocations) {
-        this(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
-                missableTypeComputer, conflictingTypeResovler, physicalOptimizationConfig, clusterLocations,
-                new LogicalOperatorPrettyPrintVisitor());
-    }
-
-    public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
-            IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer nullableTypeComputer,
             IConflictingTypeResolver conflictingTypeResovler, PhysicalOptimizationConfig physicalOptimizationConfig,
             AlgebricksPartitionConstraint clusterLocations, LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
index 4cecb07..3a18490 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
@@ -97,11 +97,11 @@
         if (physOp.getLocalMemoryRequirements() != null) {
             return false;
         }
-        computeLocalMemoryRequirements(op, context, createMemoryRequirementsConfigurator(context));
+        computeLocalMemoryRequirements(op, createMemoryRequirementsConfigurator(context));
         return true;
     }
 
-    private void computeLocalMemoryRequirements(AbstractLogicalOperator op, IOptimizationContext context,
+    private void computeLocalMemoryRequirements(AbstractLogicalOperator op,
             ILogicalOperatorVisitor<Void, Void> memoryRequirementsVisitor) throws AlgebricksException {
         IPhysicalOperator physOp = op.getPhysicalOperator();
         if (physOp.getLocalMemoryRequirements() == null) {
@@ -109,20 +109,21 @@
             if (physOp.getLocalMemoryRequirements() == null) {
                 throw new IllegalStateException(physOp.getOperatorTag().toString());
             }
-            op.accept(memoryRequirementsVisitor, null);
+            if (memoryRequirementsVisitor != null) {
+                op.accept(memoryRequirementsVisitor, null);
+            }
         }
         if (op.hasNestedPlans()) {
             AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
             for (ILogicalPlan p : nested.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> root : p.getRoots()) {
-                    computeLocalMemoryRequirements((AbstractLogicalOperator) root.getValue(), context,
-                            createMemoryRequirementsConfigurator(context));
+                    computeLocalMemoryRequirements((AbstractLogicalOperator) root.getValue(),
+                            memoryRequirementsVisitor);
                 }
             }
         }
         for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
-            computeLocalMemoryRequirements((AbstractLogicalOperator) opRef.getValue(), context,
-                    createMemoryRequirementsConfigurator(context));
+            computeLocalMemoryRequirements((AbstractLogicalOperator) opRef.getValue(), memoryRequirementsVisitor);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 8b385e3..3b4bcb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -406,6 +406,10 @@
         });
         jjob.set("connectors", jcArray);
 
+        if (requiredClusterCapacity != null) {
+            jjob.set("required-capacity", requiredClusterCapacity.toJSON());
+        }
+
         return jjob;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
index 6b03968..c91c40c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/ClusterCapacity.java
@@ -27,6 +27,9 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.util.StorageUtil;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class ClusterCapacity implements IClusterCapacity {
     private static final long serialVersionUID = 3487998182013966747L;
 
@@ -123,4 +126,13 @@
         return "(memory: " + StorageUtil.toHumanReadableSize(aggregatedMemoryByteSize) + " bytes, CPU cores: "
                 + aggregatedCores + ")";
     }
+
+    @Override
+    public ObjectNode toJSON() {
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode jcc = om.createObjectNode();
+        jcc.put("memory", aggregatedMemoryByteSize);
+        jcc.put("cores", aggregatedCores);
+        return jcc;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
index 59b6bfd..7027c83 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IReadOnlyClusterCapacity.java
@@ -23,6 +23,8 @@
 
 import org.apache.hyracks.api.exceptions.HyracksException;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 /**
  * This interface provides read-only methods for the capacity of a cluster.
  */
@@ -61,4 +63,8 @@
      */
     int getCores(String nodeId) throws HyracksException;
 
+    /**
+     * Translates this cluster capacity to JSON.
+     */
+    ObjectNode toJSON();
 }