Merge branch 'master' of https://code.google.com/p/hyracks
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index e0ee1f0..d66ddb3 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -62,7 +62,6 @@
     protected IMergeAggregationExpressionFactory mergeAggregationExpressionFactory;
     protected PhysicalOptimizationConfig physicalOptimizationConfig = new PhysicalOptimizationConfig();
     protected AlgebricksPartitionConstraint clusterLocations;
-    protected int frameSize = -1;
 
     public abstract ICompilerFactory create();
 
@@ -195,14 +194,6 @@
         return normalizedKeyComputerFactoryProvider;
     }
 
-    public void setFrameSize(int frameSize) {
-        this.frameSize = frameSize;
-    }
-
-    public int getFrameSize() {
-        return frameSize;
-    }
-
     public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
         return partialAggregationTypeComputer;
     }
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 1c62307..5da4e67 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -42,12 +42,12 @@
         }
 
         @Override
-        public IOptimizationContext createOptimizationContext(int varCounter, int frameSize,
+        public IOptimizationContext createOptimizationContext(int varCounter,
                 IExpressionEvalSizeComputer expressionEvalSizeComputer,
                 IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
                 IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
                 PhysicalOptimizationConfig physicalOptimizationConfig) {
-            return new AlgebricksOptimizationContext(varCounter, frameSize, expressionEvalSizeComputer,
+            return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
                     mergeAggregationExpressionFactory, expressionTypeComputer, nullableTypeComputer,
                     physicalOptimizationConfig);
         }
@@ -69,7 +69,7 @@
             @Override
             public ICompiler createCompiler(final ILogicalPlan plan, final IMetadataProvider<?, ?> metadata,
                     int varCounter) {
-                final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter, frameSize,
+                final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter,
                         expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
                         nullableTypeComputer, physicalOptimizationConfig);
                 oc.setMetadataDeclarations(metadata);
@@ -91,7 +91,9 @@
                                 binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider,
                                 nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
                                 expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
-                                partialAggregationTypeComputer, predEvaluatorFactoryProvider, frameSize, clusterLocations);
+                                partialAggregationTypeComputer, predEvaluatorFactoryProvider,
+                                physicalOptimizationConfig.getFrameSize(), clusterLocations);
+
                         PlanCompiler pc = new PlanCompiler(context);
                         return pc.compilePlan(plan, null, jobEventListenerFactory);
                     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 0aa1ff6..4f9ddf0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -32,9 +32,6 @@
 
     public abstract int getVarCounter();
 
-    // -1 if unknown
-    public abstract int getFrameSize();
-
     public abstract void setVarCounter(int varCounter);
 
     public abstract LogicalVariable newVar();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index e4fd78e..1979048 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -294,7 +294,7 @@
                         }
                         overhead += sz;
                     }
-                    int frameSize = context.getFrameSize();
+                    int frameSize = context.getPhysicalOptimizationConfig().getFrameSize();
                     if (frameSize > 0) {
                         long sz = frames0 * frameSize + overhead * v.getNumberOfTuples();
                         int frames1 = (int) (sz / frameSize);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 738fc7f..79b463f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -70,17 +70,14 @@
     protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>>();
 
     protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>();
-    private final int frameSize;
     private final IExpressionTypeComputer expressionTypeComputer;
     private final INullableTypeComputer nullableTypeComputer;
 
-    public AlgebricksOptimizationContext(int varCounter, int frameSize,
-            IExpressionEvalSizeComputer expressionEvalSizeComputer,
+    public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
             PhysicalOptimizationConfig physicalOptimizationConfig) {
         this.varCounter = varCounter;
-        this.frameSize = frameSize;
         this.expressionEvalSizeComputer = expressionEvalSizeComputer;
         this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
         this.expressionTypeComputer = expressionTypeComputer;
@@ -220,11 +217,6 @@
         return varEvalSizeEnv;
     }
 
-    @Override
-    public int getFrameSize() {
-        return frameSize;
-    }
-
     public IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory() {
         return mergeAggregationExpressionFactory;
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
index 1373fa7..9f141e3 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
@@ -21,7 +21,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.INullableTypeComputer;
 
 public interface IOptimizationContextFactory {
-    public IOptimizationContext createOptimizationContext(int varCounter, int frameSize,
+    public IOptimizationContext createOptimizationContext(int varCounter,
             IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index fc6c198..c3858ba 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -4,10 +4,15 @@
 
 public class PhysicalOptimizationConfig {
     private static final int MB = 1048576;
+    
     private static final String FRAMESIZE = "FRAMESIZE";
     private static final String MAX_FRAMES_EXTERNAL_SORT = "MAX_FRAMES_EXTERNAL_SORT";
     private static final String MAX_FRAMES_EXTERNAL_GROUP_BY = "MAX_FRAMES_EXTERNAL_GROUP_BY";
-
+    private static final String MAX_FRAMES_LEFT_INPUT_HYBRID_HASH = "MAX_FRAMES_LEFT_INPUT_HYBRID_HASH";
+    private static final String MAX_FRAMES_HYBRID_HASH = "MAX_FRAMES_HYBRID_HASH";
+    private static final String FUDGE_FACTOR = "FUDGE_FACTOR";
+    private static final String MAX_RECORDS_PER_FRAME = "MAX_RECORDS_PER_FRAME";
+    
     private static final String DEFAULT_HASH_GROUP_TABLE_SIZE = "DEFAULT_HASH_GROUP_TABLE_SIZE";
     private static final String DEFAULT_EXTERNAL_GROUP_TABLE_SIZE = "DEFAULT_EXTERNAL_GROUP_TABLE_SIZE";
     private static final String DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE = "DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE";
@@ -33,14 +38,39 @@
     public void setFrameSize(int frameSize) {
         setInt(FRAMESIZE, frameSize);
     }
-
-    public int getMaxFramesExternalSort() {
-        int frameSize = getFrameSize();
-        return getInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 512 * MB) / frameSize));
+    
+    public double getFudgeFactor() {
+        return getDouble(FUDGE_FACTOR, 1.3);
     }
 
-    public void setMaxFramesExternalSort(int frameLimit) {
-        setInt(MAX_FRAMES_EXTERNAL_SORT, frameLimit);
+    public void setFudgeFactor(double fudgeFactor) {
+        setDouble(FUDGE_FACTOR, fudgeFactor);
+    }
+    
+    public int getMaxRecordsPerFrame() {
+        return getInt(MAX_RECORDS_PER_FRAME, 512);
+    }
+
+    public void setMaxRecordsPerFrame(int maxRecords) {
+        setInt(MAX_RECORDS_PER_FRAME, maxRecords);
+    }
+
+    public int getMaxFramesLeftInputHybridHash() {
+        int frameSize = getFrameSize();
+        return getInt(MAX_FRAMES_LEFT_INPUT_HYBRID_HASH, (int) (140L * 1024 * MB / frameSize));
+    }
+
+    public void setMaxFramesLeftInputHybridHash(int frameLimit) {
+        setInt(MAX_FRAMES_LEFT_INPUT_HYBRID_HASH, frameLimit);
+    }
+    
+    public int getMaxFramesHybridHash() {
+        int frameSize = getFrameSize();
+        return getInt(MAX_FRAMES_HYBRID_HASH, (int) (64L * MB / frameSize));
+    }
+
+    public void setMaxFramesHybridHash(int frameLimit) {
+        setInt(MAX_FRAMES_HYBRID_HASH, frameLimit);
     }
 
     public int getMaxFramesExternalGroupBy() {
@@ -51,6 +81,15 @@
     public void setMaxFramesExternalGroupBy(int frameLimit) {
         setInt(MAX_FRAMES_EXTERNAL_GROUP_BY, frameLimit);
     }
+    
+    public int getMaxFramesExternalSort() {
+        int frameSize = getFrameSize();
+        return getInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 32 * MB) / frameSize));
+    }
+
+    public void setMaxFramesExternalSort(int frameLimit) {
+        setInt(MAX_FRAMES_EXTERNAL_SORT, frameLimit);
+    }
 
     public int getHashGroupByTableSize() {
         return getInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767);
@@ -87,5 +126,17 @@
         else
             return Integer.parseInt(value);
     }
+    
+    private void setDouble(String property, double value) {
+        properties.setProperty(property, Double.toString(value));
+    }
+
+    private double getDouble(String property, double defaultValue) {
+        String value = properties.getProperty(property);
+        if (value == null)
+            return defaultValue;
+        else
+            return Double.parseDouble(value);
+    }
 
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/JoinUtils.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/JoinUtils.java
index ddc00e3..b2eb07b 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -46,14 +46,6 @@
 
 public class JoinUtils {
 
-    private final static int MB = 1048576;
-
-    private final static double DEFAULT_FUDGE_FACTOR = 1.3;
-    private final static int MAX_RECORDS_PER_FRAME = 512;
-    private final static int DEFAULT_FRAME_SIZE = 32768;
-    private final static int MAX_LEFT_INPUT_SIZE_HYBRID_HASH = (int) (140L * 1024 * MB / DEFAULT_FRAME_SIZE);
-    private final static int DEFAULT_MEMORY_SIZE_HYBRID_HASH = (int) (256L * MB / DEFAULT_FRAME_SIZE);
-
     public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, IOptimizationContext context)
             throws AlgebricksException {
         List<LogicalVariable> sideLeft = new LinkedList<LogicalVariable>();
@@ -82,21 +74,23 @@
                 }
             }
         } else {
-            setNLJoinOp(op);
+            setNLJoinOp(op, context);
         }
     }
 
-    private static void setNLJoinOp(AbstractBinaryJoinOperator op) {
-        op.setPhysicalOperator(new NLJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
-                DEFAULT_MEMORY_SIZE_HYBRID_HASH));
+    private static void setNLJoinOp(AbstractBinaryJoinOperator op, IOptimizationContext context) {
+        op.setPhysicalOperator(new NLJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, context
+                .getPhysicalOptimizationConfig().getMaxRecordsPerFrame()));
     }
 
     private static void setHashJoinOp(AbstractBinaryJoinOperator op, JoinPartitioningType partitioningType,
             List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IOptimizationContext context)
             throws AlgebricksException {
         op.setPhysicalOperator(new HybridHashJoinPOperator(op.getJoinKind(), partitioningType, sideLeft, sideRight,
-                DEFAULT_MEMORY_SIZE_HYBRID_HASH, MAX_LEFT_INPUT_SIZE_HYBRID_HASH, MAX_RECORDS_PER_FRAME,
-                DEFAULT_FUDGE_FACTOR));
+                context.getPhysicalOptimizationConfig().getMaxFramesHybridHash(), context
+                        .getPhysicalOptimizationConfig().getMaxFramesLeftInputHybridHash(), context
+                        .getPhysicalOptimizationConfig().getMaxRecordsPerFrame(), context
+                        .getPhysicalOptimizationConfig().getFudgeFactor()));
         if (partitioningType == JoinPartitioningType.BROADCAST) {
             hybridToInMemHashJoin(op, context);
         }