[NO ISSUE][COMP] Make memory requirements an operator property

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Refactor how memory requirements are maintained in a query plan
- Introduce LocalMemoryRequirements class. Its instances are held
  by each physical operator and could be altered by the optimizer.
- Introduce optimizer rule SetMemoryRequirementsRule that
  initializes and configures memory requirements for each operator

Change-Id: I3481ddfe163c6ce786290c540cbd05db16a7f64f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3374
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 98feafa..491911b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -18,9 +18,6 @@
  */
 package org.apache.asterix.algebra.operators.physical;
 
-import java.util.Map;
-
-import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.config.OptimizationConfUtil;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.declared.DataSourceId;
@@ -50,6 +47,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -68,6 +66,11 @@
  * inverted-index search.
  */
 public class InvertedIndexPOperator extends IndexSearchPOperator {
+
+    // variable memory, min 5 frames
+    // 1 for query + 2 for intermediate results + 1 for final result + 1 for reading an inverted list
+    public static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_TEXT_SEARCH;
+
     private final boolean isPartitioned;
 
     public InvertedIndexPOperator(IDataSourceIndex<String, DataSourceId> idx, INodeDomain domain,
@@ -86,6 +89,11 @@
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_TEXT_SEARCH);
+    }
+
+    @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
@@ -114,9 +122,7 @@
             retainNull = true;
         }
         // In-memory budget (frame limit) for inverted-index search operations
-        CompilerProperties compilerProp = metadataProvider.getApplicationContext().getCompilerProperties();
-        Map<String, Object> queryConfig = metadataProvider.getConfig();
-        int frameLimit = OptimizationConfUtil.getTextSearchNumFrames(compilerProp, queryConfig, op.getSourceLocation());
+        int frameLimit = localMemoryRequirements.getMemoryBudgetInFrames();
 
         // Build runtime.
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> invIndexSearch =
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 6a11abf..531cbf3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -105,6 +105,7 @@
 import org.apache.hyracks.algebricks.rewriter.rules.ExtractGbyExpressionsRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ExtractGroupByDecorVariablesRule;
 import org.apache.hyracks.algebricks.rewriter.rules.FactorRedundantGroupAndDecorVarsRule;
+import org.apache.hyracks.algebricks.rewriter.rules.HybridToInMemoryHashJoinRule;
 import org.apache.hyracks.algebricks.rewriter.rules.InferTypesRule;
 import org.apache.hyracks.algebricks.rewriter.rules.InlineAssignIntoAggregateRule;
 import org.apache.hyracks.algebricks.rewriter.rules.InlineSingleReferenceVariablesRule;
@@ -133,6 +134,7 @@
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveUnusedAssignAndAggregateRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ReuseWindowAggregateRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SetExecutionModeRule;
+import org.apache.hyracks.algebricks.rewriter.rules.SetMemoryRequirementsRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SimpleUnnestToProductRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SwitchInnerJoinBranchRule;
 import org.apache.hyracks.algebricks.rewriter.rules.subplan.EliminateIsomorphicSubplanRule;
@@ -359,6 +361,9 @@
         //Turned off the following rule for now not to change OptimizerTest results.
         physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
+        physicalRewritesAllLevels.add(new SetMemoryRequirementsRule());
+        // must run after SetMemoryRequirementsRule
+        physicalRewritesAllLevels.add(new HybridToInMemoryHashJoinRule());
         physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());
         physicalRewritesAllLevels.add(new CheckFullParallelSortRule());
         physicalRewritesAllLevels.add(new EnforceStructuralPropertiesRule(BuiltinFunctions.RANGE_MAP,
@@ -403,11 +408,11 @@
                 .add(new IsolateHyracksOperatorsRule(HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled));
         prepareForJobGenRewrites.add(new FixReplicateOperatorOutputsRule());
         prepareForJobGenRewrites.add(new ExtractCommonOperatorsRule());
-        // Re-infer all types, so that, e.g., the effect of not-is-null is
-        // propagated.
+        // Re-infer all types, so that, e.g., the effect of not-is-null is propagated
         prepareForJobGenRewrites.add(new ReinferAllTypesRule());
         prepareForJobGenRewrites.add(new PushGroupByIntoSortRule());
         prepareForJobGenRewrites.add(new SetExecutionModeRule());
+        prepareForJobGenRewrites.add(new SetMemoryRequirementsRule());
         prepareForJobGenRewrites.add(new SweepIllegalNonfunctionalFunctions());
         prepareForJobGenRewrites.add(new FixReplicateOperatorOutputsRule());
         return prepareForJobGenRewrites;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java
index e64889d..51e536a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushLimitIntoOrderByRule.java
@@ -35,7 +35,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 
 /**
  * If an ORDER operator is followed by LIMIT, then we can push LIMIT into ORDER operator.
@@ -89,7 +88,6 @@
      */
     private boolean pushLimitIntoOrder(Mutable<ILogicalOperator> opRef, Mutable<ILogicalOperator> opRef2,
             IOptimizationContext context) throws AlgebricksException {
-        PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
         LimitOperator limitOp = (LimitOperator) opRef.getValue();
         OrderOperator orderOp = (OrderOperator) opRef2.getValue();
 
@@ -106,8 +104,7 @@
         // Create the new ORDER operator, set the topK value, and replace the current one.
         OrderOperator newOrderOp = new OrderOperator(orderOp.getOrderExpressions(), topK);
         newOrderOp.setSourceLocation(orderOp.getSourceLocation());
-        newOrderOp.setPhysicalOperator(
-                new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort(), newOrderOp.getTopK()));
+        newOrderOp.setPhysicalOperator(new StableSortPOperator(newOrderOp.getTopK()));
         newOrderOp.getInputs().addAll(orderOp.getInputs());
         newOrderOp.setExecutionMode(orderOp.getExecutionMode());
         newOrderOp.recomputeSchema();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 69eecfd..31de7ee 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -130,11 +130,7 @@
             }
 
             generateMergeAggregationExpressions(gby);
-
-            return new ExternalGroupByPOperator(gby.getGroupByVarList(),
-                    physicalOptimizationConfig.getMaxFramesForGroupBy(),
-                    (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
-                            * physicalOptimizationConfig.getFrameSize());
+            return new ExternalGroupByPOperator(gby.getGroupByVarList());
         }
 
         private void generateMergeAggregationExpressions(GroupByOperator gby) throws AlgebricksException {
@@ -252,12 +248,11 @@
                 boolean nestedTrivialAggregates =
                         winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan);
                 return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(),
-                        frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates,
-                        context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+                        frameStartIsMonotonic, frameEndIsMonotonic, nestedTrivialAggregates);
             } else if (AnalysisUtil.hasFunctionWithProperty(winOp,
                     BuiltinFunctions.WindowFunctionProperty.MATERIALIZE_PARTITION)) {
-                return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(), false, false, false,
-                        context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+                return new WindowPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList(), false, false,
+                        false);
             } else {
                 return new WindowStreamPOperator(winOp.getPartitionVarList(), winOp.getOrderColumnList());
             }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index a2c1c33..e8bf46a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -20,11 +20,11 @@
 
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 
 public class OperatorResourcesComputer {
 
@@ -32,21 +32,10 @@
     private static final long MAX_BUFFER_PER_CONNECTION = 1L;
 
     private final int numComputationPartitions;
-    private final long groupByMemorySize;
-    private final long joinMemorySize;
-    private final long sortMemorySize;
-    private final long windowMemorySize;
-    private final long textSearchMemorySize;
     private final long frameSize;
 
-    public OperatorResourcesComputer(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit,
-            int joinFrameLimit, int windowFrameLimit, int textSearchFrameLimit, long frameSize) {
+    public OperatorResourcesComputer(int numComputationPartitions, long frameSize) {
         this.numComputationPartitions = numComputationPartitions;
-        this.groupByMemorySize = groupFrameLimit * frameSize;
-        this.joinMemorySize = joinFrameLimit * frameSize;
-        this.sortMemorySize = sortFrameLimit * frameSize;
-        this.windowMemorySize = windowFrameLimit * frameSize;
-        this.textSearchMemorySize = textSearchFrameLimit * frameSize;
         this.frameSize = frameSize;
     }
 
@@ -59,79 +48,25 @@
     }
 
     public long getOperatorRequiredMemory(ILogicalOperator operator) {
-        switch (operator.getOperatorTag()) {
-            case AGGREGATE:
-            case ASSIGN:
-            case DATASOURCESCAN:
-            case DISTINCT:
-            case DISTRIBUTE_RESULT:
-            case EMPTYTUPLESOURCE:
-            case DELEGATE_OPERATOR:
-            case EXTERNAL_LOOKUP:
-            case LIMIT:
-            case MATERIALIZE:
-            case NESTEDTUPLESOURCE:
-            case PROJECT:
-            case REPLICATE:
-            case RUNNINGAGGREGATE:
-            case SCRIPT:
-            case SELECT:
-            case SINK:
-            case SPLIT:
-            case SUBPLAN:
-            case TOKENIZE:
-            case UNIONALL:
-            case UNNEST:
-            case LEFT_OUTER_UNNEST:
-            case UPDATE:
-            case WRITE:
-            case WRITE_RESULT:
-            case INDEX_INSERT_DELETE_UPSERT:
-            case INSERT_DELETE_UPSERT:
-            case INTERSECT:
-            case FORWARD:
-                return getOperatorRequiredMemory(operator, frameSize);
-            case LEFT_OUTER_UNNEST_MAP:
-            case UNNEST_MAP:
-                // Since an inverted-index search requires certain amount of memory, needs to calculate
-                // the memory size differently if the given index-search is an inverted-index search.
-                long unnestMapMemorySize = frameSize;
-                if (isInvertedIndexSearch((AbstractUnnestMapOperator) operator)) {
-                    unnestMapMemorySize += textSearchMemorySize;
-                }
-                return getOperatorRequiredMemory(operator, unnestMapMemorySize);
-            case EXCHANGE:
-                return getExchangeRequiredMemory((ExchangeOperator) operator);
-            case GROUP:
-                return getOperatorRequiredMemory(operator, groupByMemorySize);
-            case ORDER:
-                return getOperatorRequiredMemory(operator, sortMemorySize);
-            case INNERJOIN:
-            case LEFTOUTERJOIN:
-                return getOperatorRequiredMemory(operator, joinMemorySize);
-            case WINDOW:
-                return getWindowRequiredMemory((WindowOperator) operator);
-            default:
-                throw new IllegalStateException("Unrecognized operator: " + operator.getOperatorTag());
+        if (operator.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
+            return getExchangeRequiredMemory((ExchangeOperator) operator);
+        } else {
+            IPhysicalOperator physOp = ((AbstractLogicalOperator) operator).getPhysicalOperator();
+            return getOperatorRequiredMemory(operator.getExecutionMode(), physOp.getLocalMemoryRequirements());
         }
     }
 
-    private long getOperatorRequiredMemory(ILogicalOperator op, long memorySize) {
-        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED
-                || op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.LOCAL) {
+    private long getOperatorRequiredMemory(AbstractLogicalOperator.ExecutionMode opExecMode, long memorySize) {
+        if (opExecMode == AbstractLogicalOperator.ExecutionMode.PARTITIONED
+                || opExecMode == AbstractLogicalOperator.ExecutionMode.LOCAL) {
             return memorySize * numComputationPartitions;
         }
         return memorySize;
     }
 
-    private boolean isInvertedIndexSearch(AbstractUnnestMapOperator op) {
-        IPhysicalOperator physicalOperator = op.getPhysicalOperator();
-        final PhysicalOperatorTag physicalOperatorTag = physicalOperator.getOperatorTag();
-        if (physicalOperatorTag == PhysicalOperatorTag.LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH
-                || physicalOperatorTag == PhysicalOperatorTag.SINGLE_PARTITION_INVERTED_INDEX_SEARCH) {
-            return true;
-        }
-        return false;
+    private long getOperatorRequiredMemory(AbstractLogicalOperator.ExecutionMode opExecMode,
+            LocalMemoryRequirements memoryReqs) {
+        return getOperatorRequiredMemory(opExecMode, memoryReqs.getMemoryBudgetInBytes(frameSize));
     }
 
     private long getExchangeRequiredMemory(ExchangeOperator op) {
@@ -139,16 +74,8 @@
         final PhysicalOperatorTag physicalOperatorTag = physicalOperator.getOperatorTag();
         if (physicalOperatorTag == PhysicalOperatorTag.ONE_TO_ONE_EXCHANGE
                 || physicalOperatorTag == PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
-            return getOperatorRequiredMemory(op, frameSize);
+            return getOperatorRequiredMemory(op.getExecutionMode(), frameSize);
         }
         return 2L * MAX_BUFFER_PER_CONNECTION * numComputationPartitions * numComputationPartitions * frameSize;
     }
-
-    private long getWindowRequiredMemory(WindowOperator op) {
-        // memory budget configuration only applies to window operators that materialize partitions (non-streaming)
-        // streaming window operators only need 2 frames: output + (conservative estimate) last frame partition columns
-        long memorySize = op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.WINDOW_STREAM ? 2 * frameSize
-                : windowMemorySize;
-        return getOperatorRequiredMemory(op, memorySize);
-    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index 0f4c4c0..ba11956 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -55,14 +55,8 @@
             AlgebricksAbsolutePartitionConstraint computationLocations,
             PhysicalOptimizationConfig physicalOptimizationConfig) throws AlgebricksException {
         final int frameSize = physicalOptimizationConfig.getFrameSize();
-        final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort();
-        final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy();
-        final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin();
-        final int windowFrameLimit = physicalOptimizationConfig.getMaxFramesForWindow();
-        final int textSearchFrameLimit = physicalOptimizationConfig.getMaxFramesForTextSearch();
         final List<PlanStage> planStages = getStages(plan);
-        return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, sortFrameLimit,
-                groupFrameLimit, joinFrameLimit, windowFrameLimit, textSearchFrameLimit, frameSize);
+        return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, frameSize);
     }
 
     public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException {
@@ -74,15 +68,13 @@
     }
 
     public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations,
-            int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int windowFrameLimit, int textSearchFrameLimit,
             int frameSize) {
-        final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, sortFrameLimit,
-                groupFrameLimit, joinFrameLimit, windowFrameLimit, textSearchFrameLimit, frameSize);
+        final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, frameSize);
         final IClusterCapacity clusterCapacity = new ClusterCapacity();
-        final Long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max()
+        final long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max()
                 .orElseThrow(IllegalStateException::new);
         clusterCapacity.setAggregatedMemoryByteSize(maxRequiredMemory);
-        final Integer maxRequireCores = stages.stream().mapToInt(stage -> stage.getRequiredCores(computer)).max()
+        final int maxRequireCores = stages.stream().mapToInt(stage -> stage.getRequiredCores(computer)).max()
                 .orElseThrow(IllegalStateException::new);
         clusterCapacity.setAggregatedCores(maxRequireCores);
         return clusterCapacity;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index 094009e..b8beccc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
@@ -51,10 +52,20 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedLoopJoinPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.junit.Assert;
@@ -64,9 +75,7 @@
 
     private static final Set<LogicalOperatorTag> BLOCKING_OPERATORS =
             new HashSet<>(Arrays.asList(INNERJOIN, LEFTOUTERJOIN, ORDER));
-    private static final long MEMORY_BUDGET = 33554432L;
     private static final int FRAME_SIZE = 32768;
-    private static final int FRAME_LIMIT = (int) (MEMORY_BUDGET / FRAME_SIZE);
     private static final int PARALLELISM = 10;
     private static final long MAX_BUFFER_PER_CONNECTION = 1L;
 
@@ -74,9 +83,11 @@
     public void noBlockingPlan() throws AlgebricksException {
         EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
         ets.setExecutionMode(UNPARTITIONED);
+        ets.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         AssignOperator assignOperator = new AssignOperator(Collections.emptyList(), null);
         assignOperator.setExecutionMode(UNPARTITIONED);
+        assignOperator.setPhysicalOperator(new AssignPOperator());
         assignOperator.getInputs().add(new MutableObject<>(ets));
 
         ExchangeOperator exchange = new ExchangeOperator();
@@ -86,8 +97,9 @@
 
         DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
         resultOperator.setExecutionMode(UNPARTITIONED);
+        resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(exchange));
-        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject<>(resultOperator)));
 
         List<PlanStage> stages = ResourceUtils.getStages(plan);
         // ensure a single stage plan
@@ -103,9 +115,11 @@
     public void testNonBlockingGroupByOrderBy() throws AlgebricksException {
         EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
         ets.setExecutionMode(PARTITIONED);
+        ets.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         DataSourceScanOperator scanOperator = new DataSourceScanOperator(Collections.emptyList(), null);
         scanOperator.setExecutionMode(PARTITIONED);
+        scanOperator.setPhysicalOperator(new DataSourceScanPOperator(null));
         scanOperator.getInputs().add(new MutableObject<>(ets));
 
         ExchangeOperator exchange = new ExchangeOperator();
@@ -115,18 +129,19 @@
 
         GroupByOperator groupByOperator = new GroupByOperator();
         groupByOperator.setExecutionMode(PARTITIONED);
-        groupByOperator
-                .setPhysicalOperator(new PreclusteredGroupByPOperator(Collections.emptyList(), true, FRAME_LIMIT));
+        groupByOperator.setPhysicalOperator(new PreclusteredGroupByPOperator(Collections.emptyList(), true));
         groupByOperator.getInputs().add(new MutableObject<>(exchange));
 
         OrderOperator orderOperator = new OrderOperator();
         orderOperator.setExecutionMode(PARTITIONED);
+        orderOperator.setPhysicalOperator(new StableSortPOperator());
         orderOperator.getInputs().add(new MutableObject<>(groupByOperator));
 
         DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
         resultOperator.setExecutionMode(PARTITIONED);
+        resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(orderOperator));
-        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject<>(resultOperator)));
 
         final List<PlanStage> stages = ResourceUtils.getStages(plan);
         validateStages(stages, ets, exchange, groupByOperator, orderOperator, resultOperator);
@@ -136,8 +151,10 @@
 
         // dominating stage should have orderBy, orderBy's input (groupby), groupby's input (exchange),
         // exchange's input (scanOperator), and scanOperator's input (ets)
-        long orderOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
-        long groupByOperatorRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long orderOperatorRequiredMemory =
+                AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT * FRAME_SIZE * PARALLELISM;
+        long groupByOperatorRequiredMemory =
+                AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY * FRAME_SIZE * PARALLELISM;
         long exchangeRequiredMemory = PARALLELISM * FRAME_SIZE;
         long scanOperatorRequiredMemory = PARALLELISM * FRAME_SIZE;
         long etsRequiredMemory = FRAME_SIZE * PARALLELISM;
@@ -151,20 +168,26 @@
     public void testJoinGroupby() throws AlgebricksException {
         EmptyTupleSourceOperator ets1 = new EmptyTupleSourceOperator();
         ets1.setExecutionMode(PARTITIONED);
+        ets1.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         DataSourceScanOperator scanOperator1 = new DataSourceScanOperator(Collections.emptyList(), null);
         scanOperator1.setExecutionMode(PARTITIONED);
+        scanOperator1.setPhysicalOperator(new DataSourceScanPOperator(null));
         scanOperator1.getInputs().add(new MutableObject<>(ets1));
 
         EmptyTupleSourceOperator ets2 = new EmptyTupleSourceOperator();
-        ets1.setExecutionMode(PARTITIONED);
+        ets2.setExecutionMode(PARTITIONED);
+        ets2.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         DataSourceScanOperator scanOperator2 = new DataSourceScanOperator(Collections.emptyList(), null);
         scanOperator2.setExecutionMode(PARTITIONED);
+        scanOperator2.setPhysicalOperator(new DataSourceScanPOperator(null));
         scanOperator2.getInputs().add(new MutableObject<>(ets2));
 
         InnerJoinOperator firstJoin = new InnerJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
         firstJoin.setExecutionMode(PARTITIONED);
+        firstJoin.setPhysicalOperator(new NestedLoopJoinPOperator(firstJoin.getJoinKind(),
+                AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
         firstJoin.getInputs().add(new MutableObject<>(scanOperator1));
         firstJoin.getInputs().add(new MutableObject<>(scanOperator2));
 
@@ -174,11 +197,11 @@
         exchangeOperator1.getInputs().add(new MutableObject<>(firstJoin));
 
         EmptyTupleSourceOperator ets3 = new EmptyTupleSourceOperator();
-        ets1.setExecutionMode(PARTITIONED);
+        ets3.setExecutionMode(PARTITIONED);
+        ets3.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         GroupByOperator groupByOperator = new GroupByOperator();
-        groupByOperator
-                .setPhysicalOperator(new ExternalGroupByPOperator(Collections.emptyList(), FRAME_LIMIT, FRAME_LIMIT));
+        groupByOperator.setPhysicalOperator(new ExternalGroupByPOperator(Collections.emptyList()));
         groupByOperator.setExecutionMode(LOCAL);
         groupByOperator.getInputs().add(new MutableObject<>(ets3));
 
@@ -189,13 +212,16 @@
 
         LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
         secondJoin.setExecutionMode(PARTITIONED);
+        secondJoin.setPhysicalOperator(new NestedLoopJoinPOperator(secondJoin.getJoinKind(),
+                AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
         secondJoin.getInputs().add(new MutableObject<>(exchangeOperator1));
         secondJoin.getInputs().add(new MutableObject<>(exchangeOperator2));
 
         DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
         resultOperator.setExecutionMode(PARTITIONED);
+        resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(secondJoin));
-        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject<>(resultOperator)));
 
         List<PlanStage> stages = ResourceUtils.getStages(plan);
         final int expectedStages = 4;
@@ -207,9 +233,9 @@
         // resultOperator, its input (secondJoin), secondJoin's first input (exchangeOperator1), exchangeOperator1's
         // input (firstJoin), firstJoin's first input (scanOperator1), and scanOperator1's input (ets1)
         long resultOperatorRequiredMemory = FRAME_SIZE * PARALLELISM;
-        long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long secondJoinRequiredMemory = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN * FRAME_SIZE * PARALLELISM;
         long exchangeOperator1RequiredMemory = 2 * MAX_BUFFER_PER_CONNECTION * PARALLELISM * PARALLELISM * FRAME_SIZE;
-        long firstJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long firstJoinRequiredMemory = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN * FRAME_SIZE * PARALLELISM;
         long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM;
         long ets1RequiredMemory = FRAME_SIZE * PARALLELISM;
 
@@ -222,34 +248,40 @@
     public void testReplicateSortJoin() throws AlgebricksException {
         EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
         ets.setExecutionMode(PARTITIONED);
+        ets.setPhysicalOperator(new EmptyTupleSourcePOperator());
 
         DataSourceScanOperator scanOperator = new DataSourceScanOperator(Collections.emptyList(), null);
         scanOperator.setExecutionMode(PARTITIONED);
+        scanOperator.setPhysicalOperator(new DataSourceScanPOperator(null));
         scanOperator.getInputs().add(new MutableObject<>(ets));
 
         ReplicateOperator replicateOperator = new ReplicateOperator(2);
         replicateOperator.setExecutionMode(PARTITIONED);
+        replicateOperator.setPhysicalOperator(new ReplicatePOperator());
         replicateOperator.getInputs().add(new MutableObject<>(scanOperator));
 
         OrderOperator order1 = new OrderOperator();
         order1.setExecutionMode(PARTITIONED);
-        order1.setPhysicalOperator(new OneToOneExchangePOperator());
+        order1.setPhysicalOperator(new StableSortPOperator());
         order1.getInputs().add(new MutableObject<>(replicateOperator));
 
         OrderOperator order2 = new OrderOperator();
         order2.setExecutionMode(PARTITIONED);
-        order2.setPhysicalOperator(new OneToOneExchangePOperator());
+        order2.setPhysicalOperator(new StableSortPOperator());
         order2.getInputs().add(new MutableObject<>(replicateOperator));
 
         LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE));
         secondJoin.setExecutionMode(PARTITIONED);
+        secondJoin.setPhysicalOperator(new NestedLoopJoinPOperator(secondJoin.getJoinKind(),
+                AbstractJoinPOperator.JoinPartitioningType.BROADCAST));
         secondJoin.getInputs().add(new MutableObject<>(order1));
         secondJoin.getInputs().add(new MutableObject<>(order2));
 
         DistributeResultOperator resultOperator = new DistributeResultOperator(null, null);
         resultOperator.setExecutionMode(PARTITIONED);
+        resultOperator.setPhysicalOperator(new DistributeResultPOperator());
         resultOperator.getInputs().add(new MutableObject<>(secondJoin));
-        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject(resultOperator)));
+        ALogicalPlanImpl plan = new ALogicalPlanImpl(Collections.singletonList(new MutableObject<>(resultOperator)));
 
         List<PlanStage> stages = ResourceUtils.getStages(plan);
         final int expectedStages = 3;
@@ -257,14 +289,14 @@
         validateStages(stages);
 
         // dominating stage should have the following operators:
-        // secondJoin, secondJoin's second input (order2), order2's input (replicate),
+        // order1, order2, order1 and order2's input (replicate),
         // replicate's input (scanOperator), scanOperator's input (ets)
-        long secondJoinRequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
-        long order2RequiredMemory = FRAME_LIMIT * FRAME_SIZE * PARALLELISM;
+        long order1RequiredMemory = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT * FRAME_SIZE * PARALLELISM;
+        long order2RequiredMemory = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT * FRAME_SIZE * PARALLELISM;
         long replicateOperatorRequiredMemory = FRAME_SIZE * PARALLELISM;
         long scanOperator1RequiredMemory = FRAME_SIZE * PARALLELISM;
         long etsRequiredMemory = FRAME_SIZE * PARALLELISM;
-        long expectedMemory = secondJoinRequiredMemory + order2RequiredMemory + replicateOperatorRequiredMemory
+        long expectedMemory = order1RequiredMemory + order2RequiredMemory + replicateOperatorRequiredMemory
                 + scanOperator1RequiredMemory + etsRequiredMemory;
         assertRequiredMemory(stages, expectedMemory);
     }
@@ -300,8 +332,13 @@
     }
 
     private void assertRequiredMemory(List<PlanStage> stages, long expectedMemory) {
-        final IClusterCapacity clusterCapacity = ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM,
-                FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE);
+        for (PlanStage stage : stages) {
+            for (ILogicalOperator op : stage.getOperators()) {
+                ((AbstractLogicalOperator) op).getPhysicalOperator().createLocalMemoryRequirements(op);
+            }
+        }
+        final IClusterCapacity clusterCapacity =
+                ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, FRAME_SIZE);
         Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory);
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index e8643cd..b29cb61 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -23,6 +23,10 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractGroupByPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -30,13 +34,11 @@
 
 public class OptimizationConfUtil {
 
-    private static final int MIN_FRAME_LIMIT_FOR_SORT = 3;
-    private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = 4;
-    private static final int MIN_FRAME_LIMIT_FOR_JOIN = 5;
-    // 1 (output) + 1 (input copy) + 1 (partition writer) + 2 (seekable partition reader)
-    private static final int MIN_FRAME_LIMIT_FOR_WINDOW = 5;
-    // one for query, two for intermediate results, one for final result, and one for reading an inverted list
-    private static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5;
+    private static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
+    private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
+    private static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
+    private static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
+    public static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5; // see InvertedIndexPOperator
 
     private OptimizationConfUtil() {
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index a88ec64..a5e22cd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 
@@ -51,6 +52,10 @@
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
             throws AlgebricksException;
 
+    public LocalMemoryRequirements getLocalMemoryRequirements();
+
+    public void createLocalMemoryRequirements(ILogicalOperator op);
+
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException;
@@ -72,10 +77,10 @@
     public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op);
 
     /*
-     * This is needed to have a kind of cost based decision on whether to merge the shared subplans and materialize the result.
-     * If the subgraph whose result we would like to materialize has an operator that is computationally expensive, we assume
-     * it is cheaper to materialize the result of this subgraph and read from the file rather than recomputing it.
+     * This is needed to have a kind of cost based decision on whether to merge the shared subplans and materialize
+     * the result. If the subgraph whose result we would like to materialize has an operator that is computationally
+     * expensive, we assume it is cheaper to materialize the result of this subgraph and read from the file rather
+     * than recomputing it.
      */
     public boolean expensiveThanMaterialization();
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
index ce6dedc..17c60b9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
@@ -21,17 +21,19 @@
 
 import java.util.List;
 
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 
 public abstract class AbstractGroupByPOperator extends AbstractPhysicalOperator {
 
+    // variable memory, min 4 frames
+    public static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = 4;
+
     protected List<LogicalVariable> columnList;
 
-    protected final int framesLimit;
-
-    protected AbstractGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
+    protected AbstractGroupByPOperator(List<LogicalVariable> columnList) {
         this.columnList = columnList;
-        this.framesLimit = framesLimit;
     }
 
     public List<LogicalVariable> getGroupByColumns() {
@@ -48,6 +50,11 @@
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_GROUP_BY);
+    }
+
+    @Override
     public String toString() {
         return getOperatorTag().toString() + columnList;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
index aea9b3e..c300392 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
@@ -21,9 +21,13 @@
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 
 public abstract class AbstractJoinPOperator extends AbstractPhysicalOperator {
 
+    // variable memory, min 5 frames
+    public static final int MIN_FRAME_LIMIT_FOR_JOIN = 5;
+
     public enum JoinPartitioningType {
         PAIRWISE,
         BROADCAST
@@ -56,4 +60,9 @@
     public boolean expensiveThanMaterialization() {
         return true;
     }
+
+    @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_JOIN);
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 29d6037..96fbe3e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -50,6 +51,7 @@
 public abstract class AbstractPhysicalOperator implements IPhysicalOperator {
 
     protected IPhysicalPropertiesVector deliveredProperties;
+    protected LocalMemoryRequirements localMemoryRequirements;
     private boolean disableJobGenBelow = false;
     private Object hostQueryContext;
 
@@ -88,6 +90,16 @@
     }
 
     @Override
+    public LocalMemoryRequirements getLocalMemoryRequirements() {
+        return localMemoryRequirements;
+    }
+
+    @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(1);
+    }
+
+    @Override
     public void disableJobGenBelowMe() {
         this.disableJobGenBelow = true;
     }
@@ -141,14 +153,13 @@
         List<List<AlgebricksPipeline>> subplans = new ArrayList<>(npOp.getNestedPlans().size());
         PlanCompiler pc = new PlanCompiler(context);
         for (ILogicalPlan p : npOp.getNestedPlans()) {
-            subplans.add(buildPipelineWithProjection(p, outerPlanSchema, npOp, opSchema, pc));
+            subplans.add(buildPipelineWithProjection(p, outerPlanSchema, opSchema, pc));
         }
         return subplans;
     }
 
     private List<AlgebricksPipeline> buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema outerPlanSchema,
-            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler pc)
-            throws AlgebricksException {
+            IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException {
         if (p.getRoots().size() > 1) {
             throw new NotImplementedException("Nested plans with several roots are not supported.");
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 23a411c..fdb7347 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -59,8 +59,8 @@
 
 public abstract class AbstractPreclusteredGroupByPOperator extends AbstractGroupByPOperator {
 
-    protected AbstractPreclusteredGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
-        super(columnList, framesLimit);
+    protected AbstractPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
+        super(columnList);
     }
 
     // Obs: We don't propagate properties corresponding to decors, since they
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 81852d4..eaec772 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
@@ -49,12 +50,13 @@
 
 public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperator {
 
-    final int maxNumberOfFrames;
+    // variable memory, min 3 frames
+    public static final int MIN_FRAME_LIMIT_FOR_SORT = 3;
+
     OrderColumn[] sortColumns;
     ILocalStructuralProperty orderProp;
 
-    AbstractStableSortPOperator(int maxNumberOfFrames) {
-        this.maxNumberOfFrames = maxNumberOfFrames;
+    AbstractStableSortPOperator() {
     }
 
     public OrderColumn[] getSortColumns() {
@@ -163,4 +165,9 @@
                 && clusterDomain.cardinality() != null && clusterDomain.cardinality() > 1
                 && ctx.getPhysicalOptimizationConfig().getSortParallel();
     }
+
+    @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_SORT);
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 927beae..bf0a824 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -68,11 +68,8 @@
 
 public class ExternalGroupByPOperator extends AbstractGroupByPOperator {
 
-    private final long inputSize;
-
-    public ExternalGroupByPOperator(List<LogicalVariable> columnList, int framesLimit, long fileSize) {
-        super(columnList, framesLimit);
-        this.inputSize = fileSize;
+    public ExternalGroupByPOperator(List<LogicalVariable> columnList) {
+        super(columnList);
     }
 
     @Override
@@ -238,11 +235,14 @@
                 JobGenHelper.variablesToAscNormalizedKeyComputerFactory(gbyCols, aggOpInputEnv, context);
 
         // Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
-        int memoryBudgetInBytes = context.getFrameSize() * framesLimit;
+        int frameSize = context.getFrameSize();
+        long memoryBudgetInBytes = localMemoryRequirements.getMemoryBudgetInBytes(frameSize);
         int groupByColumnsCount = gby.getGroupByList().size() + numFds;
         int hashTableSize = ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(memoryBudgetInBytes,
-                groupByColumnsCount, context.getFrameSize());
+                groupByColumnsCount, frameSize);
 
+        int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
+        long inputSize = framesLimit * (long) frameSize;
         ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
                 keyAndDecFields, framesLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory,
                 mergeFactory, recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 6d8b6e3..8a56fbb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -60,8 +60,6 @@
 
 public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
 
-    // The maximum number of in-memory frames that this hash join can use.
-    private final int memSizeInFrames;
     private final int maxInputBuildSizeInFrames;
     private final double fudgeFactor;
 
@@ -69,17 +67,15 @@
 
     public HybridHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
             List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
-            int memSizeInFrames, int maxInputSizeInFrames, int aveRecordsPerFrame, double fudgeFactor) {
+            int maxInputSizeInFrames, int aveRecordsPerFrame, double fudgeFactor) {
         super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
-        this.memSizeInFrames = memSizeInFrames;
         this.maxInputBuildSizeInFrames = maxInputSizeInFrames;
         this.fudgeFactor = fudgeFactor;
         if (LOGGER.isTraceEnabled()) {
             LOGGER.trace("HybridHashJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType="
                     + partitioningType + ", List<LogicalVariable>=" + sideLeftOfEqualities + ", List<LogicalVariable>="
-                    + sideRightOfEqualities + ", int memSizeInFrames=" + memSizeInFrames
-                    + ", int maxInputSize0InFrames=" + maxInputSizeInFrames + ", int aveRecordsPerFrame="
-                    + aveRecordsPerFrame + ", double fudgeFactor=" + fudgeFactor + ".");
+                    + sideRightOfEqualities + ", int maxInputSize0InFrames=" + maxInputSizeInFrames
+                    + ", int aveRecordsPerFrame=" + aveRecordsPerFrame + ", double fudgeFactor=" + fudgeFactor + ".");
         }
     }
 
@@ -97,10 +93,6 @@
         return fudgeFactor;
     }
 
-    public int getMemSizeInFrames() {
-        return memSizeInFrames;
-    }
-
     @Override
     public String toString() {
         return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch;
@@ -156,11 +148,12 @@
             IBinaryHashFunctionFamily[] rightHashFunFamilies, IBinaryComparatorFactory[] leftCompFactories,
             IBinaryComparatorFactory[] rightCompFactories, IPredicateEvaluatorFactory predEvaluatorFactory,
             RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) {
+        int memSizeInFrames = localMemoryRequirements.getMemoryBudgetInFrames();
         switch (kind) {
             case INNER:
-                return new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
-                        maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies,
-                        rightHashFunFamilies, leftCompFactories, rightCompFactories, recDescriptor,
+                return new OptimizedHybridHashJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames,
+                        getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies, rightHashFunFamilies,
+                        leftCompFactories, rightCompFactories, recDescriptor,
                         new JoinMultiComparatorFactory(leftCompFactories, keysLeft, keysRight),
                         new JoinMultiComparatorFactory(rightCompFactories, keysRight, keysLeft), predEvaluatorFactory);
             case LEFT_OUTER:
@@ -168,9 +161,9 @@
                 for (int j = 0; j < nonMatchWriterFactories.length; j++) {
                     nonMatchWriterFactories[j] = context.getMissingWriterFactory();
                 }
-                return new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
-                        maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies,
-                        rightHashFunFamilies, leftCompFactories, rightCompFactories, recDescriptor,
+                return new OptimizedHybridHashJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames,
+                        getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies, rightHashFunFamilies,
+                        leftCompFactories, rightCompFactories, recDescriptor,
                         new JoinMultiComparatorFactory(leftCompFactories, keysLeft, keysRight),
                         new JoinMultiComparatorFactory(rightCompFactories, keysRight, keysLeft), predEvaluatorFactory,
                         true, nonMatchWriterFactories);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index 152fcc6..90ae4cd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -50,19 +50,15 @@
 public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator {
 
     private final int tableSize;
-    // The maximum number of in-memory frames that this hash join can use.
-    private final int memSizeInFrames;
 
     /**
      * builds on the first operator and probes on the second.
      */
 
     public InMemoryHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
-            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize,
-            int memSizeInFrames) {
+            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize) {
         super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
         this.tableSize = tableSize;
-        this.memSizeInFrames = memSizeInFrames;
     }
 
     @Override
@@ -104,14 +100,16 @@
 
         IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
                 context.getPredicateEvaluatorFactoryProvider();
-        IPredicateEvaluatorFactory predEvaluatorFactory = (predEvaluatorFactoryProvider == null ? null
-                : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight));
+        IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider == null ? null
+                : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
 
         RecordDescriptor recDescriptor =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IOperatorDescriptor opDesc;
 
+        int memSizeInFrames = localMemoryRequirements.getMemoryBudgetInFrames();
+
         switch (kind) {
             case INNER:
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, leftHashFunFactories,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
index 350bcfb..2d70abe 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
@@ -39,8 +39,8 @@
 
 public class MicroPreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
 
-    public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
-        super(columnList, framesLimit);
+    public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
+        super(columnList);
     }
 
     @Override
@@ -72,6 +72,7 @@
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
         RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+        int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
         MicroPreClusteredGroupRuntimeFactory runtime = new MicroPreClusteredGroupRuntimeFactory(keys,
                 comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null, framesLimit);
         runtime.setSourceLocation(gby.getSourceLocation());
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
index 413c1a4..403ca57 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroStableSortPOperator.java
@@ -39,8 +39,7 @@
 
 public class MicroStableSortPOperator extends AbstractStableSortPOperator {
 
-    public MicroStableSortPOperator(int maxNumberOfFrames) {
-        super(maxNumberOfFrames);
+    public MicroStableSortPOperator() {
     }
 
     @Override
@@ -80,6 +79,7 @@
             i++;
         }
 
+        int maxNumberOfFrames = localMemoryRequirements.getMemoryBudgetInFrames();
         IPushRuntimeFactory runtime = new MicroSortRuntimeFactory(sortFields, nkcf, comps, null, maxNumberOfFrames);
         builder.contributeMicroOperator(op, runtime, recDescriptor);
         ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
index 048b129..524b336 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
@@ -63,11 +63,8 @@
  */
 public class NestedLoopJoinPOperator extends AbstractJoinPOperator {
 
-    private final int memSize;
-
-    public NestedLoopJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, int memSize) {
+    public NestedLoopJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType) {
         super(kind, partitioningType);
-        this.memSize = memSize;
     }
 
     @Override
@@ -141,6 +138,7 @@
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IOperatorDescriptor opDesc;
 
+        int memSize = localMemoryRequirements.getMemoryBudgetInFrames();
         switch (kind) {
             case INNER:
                 opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
@@ -187,7 +185,6 @@
     }
 
     public static class TuplePairEvaluator implements ITuplePairComparator {
-        private final IHyracksTaskContext ctx;
         private IScalarEvaluator condEvaluator;
         private final IPointable p;
         private final CompositeFrameTupleReference compositeTupleRef;
@@ -197,7 +194,6 @@
 
         public TuplePairEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory condFactory,
                 IBinaryBooleanInspector binaryBooleanInspector) throws HyracksDataException {
-            this.ctx = ctx;
             this.condEvaluator = condFactory.createScalarEvaluator(ctx);
             this.binaryBooleanInspector = binaryBooleanInspector;
             this.leftRef = new FrameTupleReference();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index b6faa36..c6b49c2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -44,8 +44,8 @@
 
     private final boolean groupAll;
 
-    public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll, int framesLimit) {
-        super(columnList, framesLimit);
+    public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll) {
+        super(columnList);
         this.groupAll = groupAll;
     }
 
@@ -85,6 +85,7 @@
         RecordDescriptor recordDescriptor =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
 
+        int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
         PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
                 comparatorFactories, aggregatorFactory, recordDescriptor, groupAll, framesLimit);
         opDesc.setSourceLocation(gby.getSourceLocation());
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index af8161f..20e197f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -69,8 +69,8 @@
 
     private final OrderColumn[] orderColumns;
 
-    public SortGroupByPOperator(List<LogicalVariable> columnList, int framesLimit, OrderColumn[] orderColumns) {
-        super(columnList, framesLimit);
+    public SortGroupByPOperator(List<LogicalVariable> columnList, OrderColumn[] orderColumns) {
+        super(columnList);
         this.orderColumns = orderColumns;
     }
 
@@ -249,6 +249,7 @@
         normalizedKeyFactory =
                 orderColumns[0].getOrder() == OrderKind.ASC ? nkcfProvider.getNormalizedKeyComputerFactory(type, true)
                         : nkcfProvider.getNormalizedKeyComputerFactory(type, false);
+        int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
         SortGroupByOperatorDescriptor gbyOpDesc = new SortGroupByOperatorDescriptor(spec, framesLimit, keys,
                 keyAndDecFields, normalizedKeyFactory, compFactories, aggregatorFactory, mergeFactory,
                 partialAggRecordDescriptor, recordDescriptor, false);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
index 9567e5b..93c5c3b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -49,12 +49,11 @@
 
     private final int topK;
 
-    public StableSortPOperator(int maxNumberOfFrames) {
-        this(maxNumberOfFrames, -1);
+    public StableSortPOperator() {
+        this(-1);
     }
 
-    public StableSortPOperator(int maxNumberOfFrames, int topK) {
-        super(maxNumberOfFrames);
+    public StableSortPOperator(int topK) {
         this.topK = topK;
     }
 
@@ -98,6 +97,7 @@
             i++;
         }
 
+        int maxNumberOfFrames = localMemoryRequirements.getMemoryBudgetInFrames();
         AbstractSorterOperatorDescriptor sortOpDesc;
         // topK == -1 means that a topK value is not provided.
         if (topK == -1) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 23853e8..2a0b052 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -21,9 +21,11 @@
 
 import java.util.List;
 
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
@@ -38,23 +40,22 @@
 
 public final class WindowPOperator extends AbstractWindowPOperator {
 
+    // variable memory, min 5 frames:
+    // 1 (output) + 1 (input copy conservative) + 1 (partition writer) + 2 (seekable partition reader)
+    public static final int MIN_FRAME_LIMIT_FOR_WINDOW = 5;
+
     private final boolean frameStartIsMonotonic;
 
     private final boolean frameEndIsMonotonic;
 
     private final boolean nestedTrivialAggregates;
 
-    // The maximum number of in-memory frames that this operator can use.
-    private final int memSizeInFrames;
-
     public WindowPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns,
-            boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, boolean nestedTrivialAggregates,
-            int memSizeInFrames) {
+            boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, boolean nestedTrivialAggregates) {
         super(partitionColumns, orderColumns);
         this.frameStartIsMonotonic = frameStartIsMonotonic;
         this.frameEndIsMonotonic = frameEndIsMonotonic;
         this.nestedTrivialAggregates = nestedTrivialAggregates;
-        this.memSizeInFrames = memSizeInFrames;
     }
 
     @Override
@@ -63,6 +64,11 @@
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_WINDOW);
+    }
+
+    @Override
     protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
             IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
@@ -74,6 +80,8 @@
             IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize,
             WindowAggregatorDescriptorFactory nestedAggFactory, JobGenContext context) {
 
+        int memSizeInFrames = localMemoryRequirements.getMemoryBudgetInFrames();
+
         // special cases
         if (!winOp.hasNestedPlans()) {
             return new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
index 33b47ec..68476d1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
@@ -21,9 +21,11 @@
 
 import java.util.List;
 
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
@@ -35,6 +37,9 @@
 
 public final class WindowStreamPOperator extends AbstractWindowPOperator {
 
+    // fixed memory, 2 frames: 1 (output) + 1 (input copy conservative)
+    public static final int MEM_SIZE_IN_FRAMES_FOR_WINDOW_STREAM = 2;
+
     public WindowStreamPOperator(List<LogicalVariable> partitionColumns, List<OrderColumn> orderColumns) {
         super(partitionColumns, orderColumns);
     }
@@ -45,6 +50,11 @@
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op) {
+        localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(MEM_SIZE_IN_FRAMES_FOR_WINDOW_STREAM);
+    }
+
+    @Override
     protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
             IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalMemoryRequirements.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalMemoryRequirements.java
new file mode 100644
index 0000000..38e318c
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/LocalMemoryRequirements.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.properties;
+
+public abstract class LocalMemoryRequirements {
+
+    public abstract int getMinMemoryBudgetInFrames();
+
+    public abstract int getMemoryBudgetInFrames();
+
+    public abstract void setMemoryBudgetInFrames(int value);
+
+    public final long getMemoryBudgetInBytes(long frameSize) {
+        return frameSize * getMemoryBudgetInFrames();
+    }
+
+    public static LocalMemoryRequirements fixedMemoryBudget(int memBudgetInFrames) {
+        if (memBudgetInFrames < 0) {
+            throw new IllegalArgumentException(String.valueOf(memBudgetInFrames));
+        }
+        return memBudgetInFrames == FixedMemoryBudget.ONE_FRAME.memBudgetInFrames ? FixedMemoryBudget.ONE_FRAME
+                : new FixedMemoryBudget(memBudgetInFrames);
+    }
+
+    private static final class FixedMemoryBudget extends LocalMemoryRequirements {
+
+        private static final FixedMemoryBudget ONE_FRAME = new FixedMemoryBudget(1);
+
+        private final int memBudgetInFrames;
+
+        private FixedMemoryBudget(int memBudgetInFrames) {
+            this.memBudgetInFrames = memBudgetInFrames;
+        }
+
+        @Override
+        public int getMinMemoryBudgetInFrames() {
+            return memBudgetInFrames;
+        }
+
+        @Override
+        public int getMemoryBudgetInFrames() {
+            return memBudgetInFrames;
+        }
+
+        @Override
+        public void setMemoryBudgetInFrames(int value) {
+            if (value != memBudgetInFrames) {
+                throw new IllegalArgumentException("Got " + value + ", expected " + memBudgetInFrames);
+            }
+        }
+    }
+
+    public static LocalMemoryRequirements variableMemoryBudget(int minMemBudgetInFrames) {
+        return new VariableMemoryBudget(minMemBudgetInFrames);
+    }
+
+    private static final class VariableMemoryBudget extends LocalMemoryRequirements {
+
+        private final int minMemBudgetInFrames;
+
+        private int memBudgetInFrames;
+
+        private VariableMemoryBudget(int minMemBudgetInFrames) {
+            this.memBudgetInFrames = this.minMemBudgetInFrames = minMemBudgetInFrames;
+        }
+
+        @Override
+        public int getMinMemoryBudgetInFrames() {
+            return minMemBudgetInFrames;
+        }
+
+        @Override
+        public int getMemoryBudgetInFrames() {
+            return memBudgetInFrames;
+        }
+
+        @Override
+        public void setMemoryBudgetInFrames(int value) {
+            if (value < minMemBudgetInFrames) {
+                throw new IllegalArgumentException("Got " + value + ", expected " + minMemBudgetInFrames + " or more");
+            }
+            memBudgetInFrames = value;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index a011abf..749a63c 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -100,7 +100,6 @@
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
@@ -110,7 +109,6 @@
 
     private static final String HASH_MERGE = "hash_merge";
     private static final String TRUE_CONSTANT = "true";
-    private PhysicalOptimizationConfig physicalOptimizationConfig;
     private final FunctionIdentifier rangeMapFunction;
     private final FunctionIdentifier localSamplingFun;
     private final FunctionIdentifier typePropagatingFun;
@@ -147,7 +145,6 @@
         // These are actually logical constraints, so they could be pre-computed
         // somewhere else, too.
 
-        physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
         if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
             AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">>>> Optimizing operator " + op.getPhysicalOperator() + ".\n");
         }
@@ -213,7 +210,7 @@
         boolean loggerTraceEnabled = AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled();
 
         // The child index of the child operator to optimize first.
-        int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context);
+        int startChildIndex = getStartChildIndex(op, pr, nestedPlan);
         IPartitioningProperty firstDeliveredPartitioning = null;
         // Enforce data properties in a top-down manner.
         for (j = 0; j < op.getInputs().size(); j++) {
@@ -328,8 +325,7 @@
     // Gets the index of a child to start top-down data property enforcement.
     // If there is a partitioning-compatible child with the operator in opRef,
     // start from this child; otherwise, start from child zero.
-    private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan,
-            IOptimizationContext context) throws AlgebricksException {
+    private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan) {
         IPhysicalPropertiesVector[] reqdProperties = null;
         if (pr != null) {
             reqdProperties = pr.getRequiredProperties();
@@ -468,7 +464,7 @@
         if (pp == null || pp.getPartitioningType() == PartitioningType.UNPARTITIONED) {
             addLocalEnforcers(op, childIndex, diffPropertiesVector.getLocalProperties(), nestedPlan, context);
             IPhysicalPropertiesVector deliveredByNewChild =
-                    ((AbstractLogicalOperator) op.getInputs().get(0).getValue()).getDeliveredPhysicalProperties();
+                    op.getInputs().get(0).getValue().getDeliveredPhysicalProperties();
             if (!nestedPlan) {
                 addPartitioningEnforcers(op, childIndex, pp, required, deliveredByNewChild, domain, context);
             }
@@ -555,9 +551,9 @@
         oo.setSourceLocation(sourceLoc);
         oo.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL);
         if (isMicroOp) {
-            oo.setPhysicalOperator(new MicroStableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
+            oo.setPhysicalOperator(new MicroStableSortPOperator());
         } else {
-            oo.setPhysicalOperator(new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
+            oo.setPhysicalOperator(new StableSortPOperator());
         }
         oo.getInputs().add(topOp);
         context.computeAndSetTypeEnvironmentForOperator(oo);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/HybridToInMemoryHashJoinRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/HybridToInMemoryHashJoinRule.java
new file mode 100644
index 0000000..b1577bf
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/HybridToInMemoryHashJoinRule.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.rewriter.util.JoinUtils;
+
+/**
+ * Must run after {@link SetMemoryRequirementsRule}
+ */
+public final class HybridToInMemoryHashJoinRule implements IAlgebraicRewriteRule {
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        if (op.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+                || op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+            AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) op;
+            if (joinOp.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.HYBRID_HASH_JOIN) {
+                return JoinUtils.hybridToInMemHashJoin(joinOp, context);
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
index 4c57f21..8a42c31 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
@@ -104,7 +104,6 @@
                         //replace preclustered gby with sort gby
                         if (!groupByOperator.isGroupAll()) {
                             op.setPhysicalOperator(new SortGroupByPOperator(groupByOperator.getGroupByVarList(),
-                                    context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy(),
                                     sortPhysicalOperator.getSortColumns()));
                         }
                         // remove the stable sort operator
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index e6cdc28..bc853f0 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -40,6 +40,7 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -174,11 +175,11 @@
 
         protected final IOptimizationContext context;
 
-        protected final PhysicalOptimizationConfig physicalOptimizationConfig;
+        protected final PhysicalOptimizationConfig physConfig;
 
         protected AlgebricksPhysicalOperatorFactoryVisitor(IOptimizationContext context) {
             this.context = context;
-            this.physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
+            this.physConfig = context.getPhysicalOptimizationConfig();
         }
 
         @Override
@@ -222,11 +223,9 @@
             }
 
             if (topLevelOp) {
-                return new PreclusteredGroupByPOperator(gby.getGroupByVarList(), gby.isGroupAll(),
-                        context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+                return new PreclusteredGroupByPOperator(gby.getGroupByVarList(), gby.isGroupAll());
             } else {
-                return new MicroPreclusteredGroupByPOperator(gby.getGroupByVarList(),
-                        context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy());
+                return new MicroPreclusteredGroupByPOperator(gby.getGroupByVarList());
             }
         }
 
@@ -236,22 +235,27 @@
             if (!hasIntermediateAgg) {
                 return null;
             }
-            return new ExternalGroupByPOperator(gby.getGroupByVarList(),
-                    physicalOptimizationConfig.getMaxFramesForGroupBy(),
-                    (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
-                            * physicalOptimizationConfig.getFrameSize());
+            return new ExternalGroupByPOperator(gby.getGroupByVarList());
         }
 
         @Override
         public IPhysicalOperator visitInnerJoinOperator(InnerJoinOperator op, Boolean topLevelOp)
                 throws AlgebricksException {
-            JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
-            return op.getPhysicalOperator();
+            return visitAbstractBinaryJoinOperator(op, topLevelOp);
         }
 
         @Override
         public IPhysicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Boolean topLevelOp)
                 throws AlgebricksException {
+            return visitAbstractBinaryJoinOperator(op, topLevelOp);
+        }
+
+        protected IPhysicalOperator visitAbstractBinaryJoinOperator(AbstractBinaryJoinOperator op, Boolean topLevelOp)
+                throws AlgebricksException {
+            if (!topLevelOp) {
+                throw AlgebricksException.create(ErrorCode.OPERATOR_NOT_IMPLEMENTED, op.getSourceLocation(),
+                        op.getOperatorTag().toString() + " (micro)");
+            }
             JoinUtils.setJoinAlgorithmAndExchangeAlgo(op, topLevelOp, context);
             return op.getPhysicalOperator();
         }
@@ -270,9 +274,9 @@
         public IPhysicalOperator visitOrderOperator(OrderOperator oo, Boolean topLevelOp) throws AlgebricksException {
             ensureAllVariables(oo.getOrderExpressions(), Pair::getSecond);
             if (topLevelOp) {
-                return new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK());
+                return new StableSortPOperator(oo.getTopK());
             } else {
-                return new MicroStableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort());
+                return new MicroStableSortPOperator();
             }
         }
 
@@ -470,8 +474,7 @@
         }
 
         protected AbstractWindowPOperator createWindowPOperator(WindowOperator op) throws AlgebricksException {
-            return new WindowPOperator(op.getPartitionVarList(), op.getOrderColumnList(), false, false, false,
-                    context.getPhysicalOptimizationConfig().getMaxFramesForWindow());
+            return new WindowPOperator(op.getPartitionVarList(), op.getOrderColumnList(), false, false, false);
         }
 
         // Physical operators for these operators must have been set already by rules that introduced them
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
new file mode 100644
index 0000000..4cecb07
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+
+/**
+ * Set memory requirements for all operators as follows:
+ * <ol>
+ * <li>First call {@link IPhysicalOperator#createLocalMemoryRequirements(ILogicalOperator)}
+ *     to initialize each operator's {@link LocalMemoryRequirements} with minimal memory budget required by
+ *     that operator</li>
+ * <li>Then increase memory requirements for certain operators as specified by {@link PhysicalOptimizationConfig}</li>
+ * </ol>
+ */
+public class SetMemoryRequirementsRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        IPhysicalOperator physOp = op.getPhysicalOperator();
+        if (physOp.getLocalMemoryRequirements() != null) {
+            return false;
+        }
+        computeLocalMemoryRequirements(op, context, createMemoryRequirementsConfigurator(context));
+        return true;
+    }
+
+    private void computeLocalMemoryRequirements(AbstractLogicalOperator op, IOptimizationContext context,
+            ILogicalOperatorVisitor<Void, Void> memoryRequirementsVisitor) throws AlgebricksException {
+        IPhysicalOperator physOp = op.getPhysicalOperator();
+        if (physOp.getLocalMemoryRequirements() == null) {
+            physOp.createLocalMemoryRequirements(op);
+            if (physOp.getLocalMemoryRequirements() == null) {
+                throw new IllegalStateException(physOp.getOperatorTag().toString());
+            }
+            op.accept(memoryRequirementsVisitor, null);
+        }
+        if (op.hasNestedPlans()) {
+            AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan p : nested.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> root : p.getRoots()) {
+                    computeLocalMemoryRequirements((AbstractLogicalOperator) root.getValue(), context,
+                            createMemoryRequirementsConfigurator(context));
+                }
+            }
+        }
+        for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
+            computeLocalMemoryRequirements((AbstractLogicalOperator) opRef.getValue(), context,
+                    createMemoryRequirementsConfigurator(context));
+        }
+    }
+
+    protected ILogicalOperatorVisitor<Void, Void> createMemoryRequirementsConfigurator(IOptimizationContext context) {
+        return new MemoryRequirementsConfigurator(context);
+    }
+
+    protected static class MemoryRequirementsConfigurator implements ILogicalOperatorVisitor<Void, Void> {
+
+        protected final IOptimizationContext context;
+
+        protected final PhysicalOptimizationConfig physConfig;
+
+        protected MemoryRequirementsConfigurator(IOptimizationContext context) {
+            this.context = context;
+            this.physConfig = context.getPhysicalOptimizationConfig();
+        }
+
+        // helper methods
+
+        protected void setOperatorMemoryBudget(AbstractLogicalOperator op, int memBudgetInFrames)
+                throws AlgebricksException {
+            LocalMemoryRequirements memoryReqs = op.getPhysicalOperator().getLocalMemoryRequirements();
+            int minBudgetInFrames = memoryReqs.getMinMemoryBudgetInFrames();
+            if (memBudgetInFrames < minBudgetInFrames) {
+                throw AlgebricksException.create(ErrorCode.ILLEGAL_MEMORY_BUDGET, op.getSourceLocation(),
+                        op.getOperatorTag().toString(), memBudgetInFrames * physConfig.getFrameSize(),
+                        minBudgetInFrames * physConfig.getFrameSize());
+            }
+            memoryReqs.setMemoryBudgetInFrames(memBudgetInFrames);
+        }
+
+        // variable memory operators
+
+        @Override
+        public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+            setOperatorMemoryBudget(op, physConfig.getMaxFramesExternalSort());
+            return null;
+        }
+
+        @Override
+        public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+            setOperatorMemoryBudget(op, physConfig.getMaxFramesForGroupBy());
+            return null;
+        }
+
+        @Override
+        public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+            if (op.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.WINDOW) {
+                setOperatorMemoryBudget(op, physConfig.getMaxFramesForWindow());
+            }
+            return null;
+        }
+
+        @Override
+        public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+            return visitJoinOperator(op, arg);
+        }
+
+        @Override
+        public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+            return visitJoinOperator(op, arg);
+        }
+
+        protected Void visitJoinOperator(AbstractBinaryJoinOperator op, Void arg) throws AlgebricksException {
+            setOperatorMemoryBudget(op, physConfig.getMaxFramesForJoin());
+            return null;
+        }
+
+        @Override
+        public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+            return visitAbstractUnnestMapOperator(op, arg);
+        }
+
+        @Override
+        public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg)
+                throws AlgebricksException {
+            return visitAbstractUnnestMapOperator(op, arg);
+        }
+
+        protected Void visitAbstractUnnestMapOperator(AbstractUnnestMapOperator op, Void arg)
+                throws AlgebricksException {
+            IPhysicalOperator physOp = op.getPhysicalOperator();
+            if (physOp.getOperatorTag() == PhysicalOperatorTag.LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH
+                    || physOp.getOperatorTag() == PhysicalOperatorTag.SINGLE_PARTITION_INVERTED_INDEX_SEARCH) {
+                setOperatorMemoryBudget(op, physConfig.getMaxFramesForTextSearch());
+            }
+            return null;
+        }
+
+        // fixed memory operators
+
+        @Override
+        public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg)
+                throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+                throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
+        @Override
+        public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 3042548..6687027 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -52,7 +52,7 @@
     }
 
     public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, boolean topLevelOp,
-            IOptimizationContext context) throws AlgebricksException {
+            IOptimizationContext context) {
         if (!topLevelOp) {
             throw new IllegalStateException("Micro operator not implemented for: " + op.getOperatorTag());
         }
@@ -87,30 +87,28 @@
                 }
             }
         } else {
-            setNestedLoopJoinOp(op, context);
+            setNestedLoopJoinOp(op);
         }
     }
 
-    private static void setNestedLoopJoinOp(AbstractBinaryJoinOperator op, IOptimizationContext context) {
-        op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
-                context.getPhysicalOptimizationConfig().getMaxFramesForJoin()));
+    private static void setNestedLoopJoinOp(AbstractBinaryJoinOperator op) {
+        op.setPhysicalOperator(new NestedLoopJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST));
     }
 
     private static void setHashJoinOp(AbstractBinaryJoinOperator op, JoinPartitioningType partitioningType,
-            List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IOptimizationContext context)
-            throws AlgebricksException {
+            List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IOptimizationContext context) {
         op.setPhysicalOperator(new HybridHashJoinPOperator(op.getJoinKind(), partitioningType, sideLeft, sideRight,
-                context.getPhysicalOptimizationConfig().getMaxFramesForJoin(),
                 context.getPhysicalOptimizationConfig().getMaxFramesForJoinLeftInput(),
                 context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(),
                 context.getPhysicalOptimizationConfig().getFudgeFactor()));
-        if (partitioningType == JoinPartitioningType.BROADCAST) {
-            hybridToInMemHashJoin(op, context);
-        }
     }
 
-    private static void hybridToInMemHashJoin(AbstractBinaryJoinOperator op, IOptimizationContext context)
+    public static boolean hybridToInMemHashJoin(AbstractBinaryJoinOperator op, IOptimizationContext context)
             throws AlgebricksException {
+        HybridHashJoinPOperator hhj = (HybridHashJoinPOperator) op.getPhysicalOperator();
+        if (hhj.getPartitioningType() != JoinPartitioningType.BROADCAST) {
+            return false;
+        }
         ILogicalOperator opBuild = op.getInputs().get(1).getValue();
         LogicalPropertiesVisitor.computeLogicalPropertiesDFS(opBuild, context);
         ILogicalPropertiesVector v = context.getLogicalPropertiesVector(opBuild);
@@ -121,19 +119,19 @@
         }
         if (v != null) {
             int size2 = v.getMaxOutputFrames();
-            HybridHashJoinPOperator hhj = (HybridHashJoinPOperator) op.getPhysicalOperator();
-            if (size2 > 0 && size2 * hhj.getFudgeFactor() <= hhj.getMemSizeInFrames()) {
+            int hhjMemSizeInFrames = hhj.getLocalMemoryRequirements().getMemoryBudgetInFrames();
+            if (size2 > 0 && size2 * hhj.getFudgeFactor() <= hhjMemSizeInFrames) {
                 if (loggerTraceEnabled) {
                     AlgebricksConfig.ALGEBRICKS_LOGGER
                             .trace("// HybridHashJoin inner branch " + opBuild.getOperatorTag() + " fits in memory\n");
                 }
                 // maintains the local properties on the probe side
-                op.setPhysicalOperator(
-                        new InMemoryHashJoinPOperator(hhj.getKind(), hhj.getPartitioningType(), hhj.getKeysLeftBranch(),
-                                hhj.getKeysRightBranch(), v.getNumberOfTuples() * 2, hhj.getMemSizeInFrames()));
+                op.setPhysicalOperator(new InMemoryHashJoinPOperator(hhj.getKind(), hhj.getPartitioningType(),
+                        hhj.getKeysLeftBranch(), hhj.getKeysRightBranch(), v.getNumberOfTuples() * 2));
+                return true;
             }
         }
-
+        return false;
     }
 
     private static boolean isHashJoinCondition(ILogicalExpression e, Collection<LogicalVariable> inLeftAll,