merged hyracks_asterix_stabilization r1724:1760

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1762 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 7be425e..f188265 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -75,7 +75,7 @@
                 pp = pv0.getPartitioningProperty();
             }
         } else {
-            pp = null;
+            pp = IPartitioningProperty.UNPARTITIONED;
         }
         this.deliveredProperties = new StructuralPropertiesVector(pp, deliveredLocalProperties(iop, context));
     }
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index bcd31a6..8cbd2d8 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -97,7 +97,7 @@
                 pp = pv1.getPartitioningProperty();
             }
         } else {
-            pp = null;
+        	pp = IPartitioningProperty.UNPARTITIONED;
         }
 
         List<ILocalStructuralProperty> localProps = new LinkedList<ILocalStructuralProperty>();
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
index b6b6042..3382c6e 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
 
 import java.util.Collection;
+import java.util.logging.Level;
 
 import org.apache.commons.lang3.mutable.Mutable;
 
@@ -61,19 +62,31 @@
         return rewriteOperatorRef(opRef, rule, true, false);
     }
 
-    private void printRuleApplication(IAlgebraicRewriteRule rule, Mutable<ILogicalOperator> opRef)
+    private String getPlanString(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        if (AlgebricksConfig.ALGEBRICKS_LOGGER.isLoggable(Level.FINE)) {
+            StringBuilder sb = new StringBuilder();
+            PlanPrettyPrinter.printOperator((AbstractLogicalOperator) opRef.getValue(), sb, pvisitor, 0);
+            return sb.toString();
+        }
+        return null;
+    }
+
+    private void printRuleApplication(IAlgebraicRewriteRule rule, String beforePlan, String afterPlan)
             throws AlgebricksException {
-        AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Rule " + rule.getClass() + " fired.\n");
-        StringBuilder sb = new StringBuilder();
-        PlanPrettyPrinter.printOperator((AbstractLogicalOperator) opRef.getValue(), sb, pvisitor, 0);
-        AlgebricksConfig.ALGEBRICKS_LOGGER.fine(sb.toString());
+        if (AlgebricksConfig.ALGEBRICKS_LOGGER.isLoggable(Level.FINE)) {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Rule " + rule.getClass() + " fired.\n");
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Before plan\n" + beforePlan + "\n");
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> After plan\n" + afterPlan + "\n");
+        }
     }
 
     protected boolean rewriteOperatorRef(Mutable<ILogicalOperator> opRef, IAlgebraicRewriteRule rule,
             boolean enterNestedPlans, boolean fullDFS) throws AlgebricksException {
 
+        String preBeforePlan = getPlanString(opRef);
         if (rule.rewritePre(opRef, context)) {
-            printRuleApplication(rule, opRef);
+            String preAfterPlan = getPlanString(opRef);
+            printRuleApplication(rule, preBeforePlan, preAfterPlan);
             return true;
         }
         boolean rewritten = false;
@@ -105,8 +118,10 @@
             }
         }
 
+        String postBeforePlan = getPlanString(opRef);
         if (rule.rewritePost(opRef, context)) {
-            printRuleApplication(rule, opRef);
+            String postAfterPlan = getPlanString(opRef);
+            printRuleApplication(rule, postBeforePlan, postAfterPlan);
             return true;
         }
 
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
new file mode 100644
index 0000000..4521d1a
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
@@ -0,0 +1,276 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Complex rewrite rule for producing joins from unnests.
+ * This rule is limited to creating left-deep trees.
+ */
+public class ComplexUnnestToProductRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN
+                && op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
+            return false;
+        }
+
+        // We may pull selects above the join we create in order to eliminate possible dependencies between
+        // the outer and inner input plans of the join.
+        List<ILogicalOperator> topSelects = new ArrayList<ILogicalOperator>();
+
+        // Keep track of the operators and used variables participating in the inner input plan.
+        HashSet<LogicalVariable> innerUsedVars = new HashSet<LogicalVariable>();
+        List<ILogicalOperator> innerOps = new ArrayList<ILogicalOperator>();
+        HashSet<LogicalVariable> outerUsedVars = new HashSet<LogicalVariable>();
+        List<ILogicalOperator> outerOps = new ArrayList<ILogicalOperator>();
+        innerOps.add(op);
+        VariableUtilities.getUsedVariables(op, innerUsedVars);
+
+        Mutable<ILogicalOperator> opRef2 = op.getInputs().get(0);
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
+
+        // Find an unnest or join and partition the plan between the first unnest and that operator into independent parts.
+        if (!findPlanPartition(op2, innerUsedVars, outerUsedVars, innerOps, outerOps, topSelects, false)) {
+            // We could not find an unnest or join.
+            return false;
+        }
+        // The last operator must be an unnest or join.
+        AbstractLogicalOperator unnestOrJoin = (AbstractLogicalOperator) outerOps.get(outerOps.size() - 1);
+
+        ILogicalOperator outerRoot = null;
+        ILogicalOperator innerRoot = null;
+        EmptyTupleSourceOperator ets = new EmptyTupleSourceOperator();
+        // If we found a join, simply use it as the outer root.
+        if (unnestOrJoin.getOperatorTag() != LogicalOperatorTag.INNERJOIN
+                && unnestOrJoin.getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN) {
+            // We've found a second unnest. First, sanity check that the unnest does not produce any vars that are used by the plan above (until the first unnest).
+            List<LogicalVariable> producedVars = new ArrayList<LogicalVariable>();
+            VariableUtilities.getProducedVariables(unnestOrJoin, producedVars);
+            for (LogicalVariable producedVar : producedVars) {
+                if (innerUsedVars.contains(producedVar)) {
+                    return false;
+                }
+            }
+            // Continue finding a partitioning of the plan such that the inner and outer partitions are independent, in order to feed a join.
+            // Now, we look below the second unnest or join.
+            VariableUtilities.getUsedVariables(unnestOrJoin, outerUsedVars);
+            AbstractLogicalOperator unnestChild = (AbstractLogicalOperator) unnestOrJoin.getInputs().get(0).getValue();
+            if (!findPlanPartition(unnestChild, innerUsedVars, outerUsedVars, innerOps, outerOps, topSelects, true)) {
+                // We could not find a suitable partitioning.
+                return false;
+            }
+        }
+        innerRoot = buildOperatorChain(innerOps, ets, context);
+        context.computeAndSetTypeEnvironmentForOperator(innerRoot);
+        outerRoot = buildOperatorChain(outerOps, null, context);
+        context.computeAndSetTypeEnvironmentForOperator(outerRoot);
+
+        InnerJoinOperator product = new InnerJoinOperator(
+                new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+        // Outer branch.
+        product.getInputs().add(new MutableObject<ILogicalOperator>(outerRoot));
+        // Inner branch.
+        product.getInputs().add(new MutableObject<ILogicalOperator>(innerRoot));
+        context.computeAndSetTypeEnvironmentForOperator(product);
+        // Put the selects on top of the join.
+        ILogicalOperator topOp = product;
+        if (!topSelects.isEmpty()) {
+            topOp = buildOperatorChain(topSelects, product, context);
+        }
+        // Plug the selects + product in the plan.
+        opRef.setValue(topOp);
+        context.computeAndSetTypeEnvironmentForOperator(topOp);
+        return true;
+    }
+
+    private ILogicalOperator buildOperatorChain(List<ILogicalOperator> ops, ILogicalOperator bottomOp,
+            IOptimizationContext context) throws AlgebricksException {
+        ILogicalOperator root = ops.get(0);
+        ILogicalOperator prevOp = root;
+        for (int i = 1; i < ops.size(); i++) {
+            ILogicalOperator inputOp = ops.get(i);
+            prevOp.getInputs().clear();
+            prevOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
+            prevOp = inputOp;
+        }
+        if (bottomOp != null) {
+            context.computeAndSetTypeEnvironmentForOperator(bottomOp);
+            prevOp.getInputs().clear();
+            prevOp.getInputs().add(new MutableObject<ILogicalOperator>(bottomOp));
+        }
+        return root;
+    }
+
+    private boolean findPlanPartition(AbstractLogicalOperator op, HashSet<LogicalVariable> innerUsedVars,
+            HashSet<LogicalVariable> outerUsedVars, List<ILogicalOperator> innerOps, List<ILogicalOperator> outerOps,
+            List<ILogicalOperator> topSelects, boolean belowSecondUnnest) throws AlgebricksException {
+        if (belowSecondUnnest && innerUsedVars.isEmpty()) {
+            // Trivially joinable.
+            return true;
+        }
+        if (!belowSecondUnnest && op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+            // Bail on subplan.
+            return false;
+        }
+        switch (op.getOperatorTag()) {
+            case UNNEST:
+            case DATASOURCESCAN: {
+                // We may have reached this state by descending through a subplan.
+                outerOps.add(op);
+                return true;
+            }
+            case INNERJOIN:
+            case LEFTOUTERJOIN: {
+                // Make sure that no variables that are live under this join are needed by the inner.
+                List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
+                VariableUtilities.getLiveVariables(op, liveVars);
+                for (LogicalVariable liveVar : liveVars) {
+                    if (innerUsedVars.contains(liveVar)) {
+                        return false;
+                    }
+                }
+                outerOps.add(op);
+                return true;
+            }
+            case SELECT: {
+                // Remember this select to pulling it above the join.
+                if (innerUsedVars.isEmpty()) {
+                    outerOps.add(op);
+                } else {
+                    topSelects.add(op);
+                }
+                break;
+            }
+            case PROJECT: {
+                // Throw away projects from the plan since we are pulling selects up.
+                break;
+            }
+            case EMPTYTUPLESOURCE:
+            case NESTEDTUPLESOURCE: {
+                if (belowSecondUnnest) {
+                    // We have successfully partitioned the plan into independent parts to be plugged into the join.
+                    return true;
+                } else {
+                    // We could not find a second unnest or a join.
+                    return false;
+                }
+            }
+            default: {
+                // The inner is trivially independent.
+                if (!belowSecondUnnest && innerUsedVars.isEmpty()) {
+                    outerOps.add(op);
+                    break;
+                }
+
+                // Examine produced vars to determine which partition uses them.
+                List<LogicalVariable> producedVars = new ArrayList<LogicalVariable>();
+                VariableUtilities.getProducedVariables(op, producedVars);
+                int outerMatches = 0;
+                int innerMatches = 0;
+                for (LogicalVariable producedVar : producedVars) {
+                    if (outerUsedVars.contains(producedVar)) {
+                        outerMatches++;
+                    } else if (innerUsedVars.contains(producedVar)) {
+                        innerMatches++;
+                    }
+                }
+
+                HashSet<LogicalVariable> targetUsedVars = null;
+                if (outerMatches == producedVars.size() && !producedVars.isEmpty()) {
+                    // All produced vars used by outer partition.
+                    outerOps.add(op);
+                    targetUsedVars = outerUsedVars;
+                } else if (innerMatches == producedVars.size() && !producedVars.isEmpty()) {
+                    // All produced vars used by inner partition.
+                    innerOps.add(op);
+                    targetUsedVars = innerUsedVars;
+                } else if (innerMatches == 0 && outerMatches == 0) {
+                    // Op produces variables that are not used in the part of the plan we've seen (or it doesn't produce any vars).
+                    // Try to figure out where it belongs by analyzing the used variables.
+                    List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+                    VariableUtilities.getUsedVariables(op, usedVars);
+                    for (LogicalVariable usedVar : usedVars) {
+                        if (outerUsedVars.contains(usedVar)) {
+                            outerOps.add(op);
+                            targetUsedVars = outerUsedVars;
+                            break;
+                        }
+                        if (innerUsedVars.contains(usedVar)) {
+                            innerOps.add(op);
+                            targetUsedVars = innerUsedVars;
+                            break;
+                        }
+                    }
+                    // TODO: For now we bail here, but we could remember such ops and determine their target partition at a later point.
+                    if (targetUsedVars == null) {
+                        return false;
+                    }
+                } else {
+                    // The current operator produces variables that are used by both partitions, so the inner and outer are not independent and, therefore, we cannot create a join.
+                    // TODO: We may still be able to split the operator to create a viable partitioning.
+                    return false;
+                }
+                // Update used variables of partition that op belongs to.
+                if (op.hasNestedPlans() && op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+                    AbstractOperatorWithNestedPlans opWithNestedPlans = (AbstractOperatorWithNestedPlans) op;
+                    opWithNestedPlans.getUsedVariablesExceptNestedPlans(targetUsedVars);
+                } else {
+                    VariableUtilities.getUsedVariables(op, targetUsedVars);
+                }
+                break;
+            }
+        }
+        if (!op.hasInputs()) {
+            if (!belowSecondUnnest) {
+                // We could not find a second unnest or a join.
+                return false;
+            } else {
+                // We have successfully partitioned the plan into independent parts to be plugged into the join.
+                return true;
+            }
+        }
+        return findPlanPartition((AbstractLogicalOperator) op.getInputs().get(0).getValue(), innerUsedVars,
+                outerUsedVars, innerOps, outerOps, topSelects, belowSecondUnnest);
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
index 9b98958..b79c31b 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
@@ -14,7 +14,10 @@
  */
 package edu.uci.ics.hyracks.algebricks.rewriter.rules;
 
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -24,11 +27,13 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class SimpleUnnestToProductRule implements IAlgebraicRewriteRule {
@@ -42,7 +47,8 @@
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
+        if (op.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN
+                && op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
             return false;
         }
 
@@ -52,6 +58,11 @@
         if (!(op2 instanceof AbstractScanOperator) && !descOrSelfIsSourceScan(op2)) {
             return false;
         }
+        // Make sure that op does not use any variables produced by op2.
+        if (!opsAreIndependent(op, op2)) {
+            return false;
+        }
+
         InnerJoinOperator product = new InnerJoinOperator(
                 new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
 
@@ -69,7 +80,12 @@
     }
 
     private boolean descOrSelfIsSourceScan(AbstractLogicalOperator op2) {
-        if (op2.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+        // Disregard data source scans in a subplan.
+        if (op2.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+            return false;
+        }
+        if (op2.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN
+                && op2.getOperatorTag() != LogicalOperatorTag.UNNEST) {
             return true;
         }
         for (Mutable<ILogicalOperator> cRef : op2.getInputs()) {
@@ -81,4 +97,17 @@
         return false;
     }
 
+    private boolean opsAreIndependent(ILogicalOperator unnestOp, ILogicalOperator outer) throws AlgebricksException {
+        List<LogicalVariable> opUsedVars = new ArrayList<LogicalVariable>();
+        VariableUtilities.getUsedVariables(unnestOp, opUsedVars);
+        Set<LogicalVariable> op2LiveVars = new HashSet<LogicalVariable>();
+        VariableUtilities.getLiveVariables(outer, op2LiveVars);
+        for (LogicalVariable usedVar : opUsedVars) {
+            if (op2LiveVars.contains(usedVar)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
 }
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index e90eaf5..c947bcc 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -164,7 +164,7 @@
                 ctx.interiorFrame.setSmFlag(false);
                 // Initialize new root (leftNode becomes new root).
                 ctx.interiorFrame.setPage(leftNode);
-                ctx.interiorFrame.initBuffer((byte) (ctx.leafFrame.getLevel() + 1));
+                ctx.interiorFrame.initBuffer((byte) (ctx.interiorFrame.getLevel() + 1));
                 // Will be cleared later in unsetSmPages.
                 ctx.interiorFrame.setSmFlag(true);
                 ctx.splitKey.setLeftPage(newLeftId);
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index c652151..e170a95 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -37,10 +37,9 @@
     private static final Logger LOGGER = Logger.getLogger(BufferCache.class.getName());
     private static final int MAP_FACTOR = 2;
 
-    private static final int MAX_VICTIMIZATION_TRY_COUNT = 5;
-    private static final int MAX_WAIT_FOR_CLEANER_THREAD_TIME = 1000 * 60;
-    private static final int MIN_CLEANED_COUNT_DIFF = 4;
-
+    private static final int MIN_CLEANED_COUNT_DIFF = 3;
+    private static final int PIN_MAX_WAIT_TIME = 50;
+    
     private final int maxOpenFiles;
 
     private final IIOManager ioManager;
@@ -111,7 +110,6 @@
     @Override
     public ICachedPage tryPin(long dpid) throws HyracksDataException {
         pinSanityCheck(dpid);
-
         CachedPage cPage = null;
         int hash = hash(dpid);
         CacheBucket bucket = pageMap[hash];
@@ -129,28 +127,27 @@
         } finally {
             bucket.bucketLock.unlock();
         }
-
         return cPage;
     }
 
     @Override
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
         pinSanityCheck(dpid);
-
         CachedPage cPage = findPage(dpid, newPage);
-        if (cPage == null) {
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine(dumpState());
-            }
-            throw new HyracksDataException("Failed to pin page " + BufferedFileHandle.getFileId(dpid) + ":"
-                    + BufferedFileHandle.getPageId(dpid) + " because all pages are pinned.");
-        }
         if (!newPage) {
-            // Resolve race of multiple threads trying to read the page from disk.
-            synchronized (cPage) {
-                if (!cPage.valid) {
-                    read(cPage);
+            if (!cPage.valid) {
+                /*
+                 * We got a buffer and we have pinned it. But its invalid. If its a new page, we just mark it as valid
+                 * and return. Or else, while we hold the page lock, we get a write latch on the data and start a read.
+                 */
+                cPage.acquireWriteLatch(false);
+                try {
+                    if (!cPage.valid) {
+                        read(cPage);
+                    }
                     cPage.valid = true;
+                } finally {
+                    cPage.releaseWriteLatch();
                 }
             }
         } else {
@@ -161,14 +158,12 @@
     }
 
     private CachedPage findPage(long dpid, boolean newPage) {
-        int victimizationTryCount = 0;
         while (true) {
             int startCleanedCount = cleanerThread.cleanedCount;
 
             CachedPage cPage = null;
             /*
-             * Hash dpid to get a bucket and then check if the page exists in
-             * the bucket.
+             * Hash dpid to get a bucket and then check if the page exists in the bucket.
              */
             int hash = hash(dpid);
             CacheBucket bucket = pageMap[hash];
@@ -186,38 +181,29 @@
                 bucket.bucketLock.unlock();
             }
             /*
-             * If we got here, the page was not in the hash table. Now we ask
-             * the page replacement strategy to find us a victim.
+             * If we got here, the page was not in the hash table. Now we ask the page replacement strategy to find us a victim.
              */
             CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
             if (victim != null) {
                 /*
-                 * We have a victim with the following invariants. 1. The dpid
-                 * on the CachedPage may or may not be valid. 2. We have a pin
-                 * on the CachedPage. We have to deal with three cases here.
-                 * Case 1: The dpid on the CachedPage is invalid (-1). This
-                 * indicates that this buffer has never been used. So we are the
-                 * only ones holding it. Get a lock on the required dpid's hash
-                 * bucket, check if someone inserted the page we want into the
-                 * table. If so, decrement the pincount on the victim and return
-                 * the winner page in the table. If such a winner does not
-                 * exist, insert the victim and return it. Case 2: The dpid on
-                 * the CachedPage is valid. Case 2a: The current dpid and
-                 * required dpid hash to the same bucket. Get the bucket lock,
-                 * check that the victim is still at pinCount == 1 If so check
-                 * if there is a winning CachedPage with the required dpid. If
-                 * so, decrement the pinCount on the victim and return the
-                 * winner. If not, update the contents of the CachedPage to hold
-                 * the required dpid and return it. If the picCount on the
-                 * victim was != 1 or CachedPage was dirty someone used the
-                 * victim for its old contents -- Decrement the pinCount and
-                 * retry. Case 2b: The current dpid and required dpid hash to
-                 * different buckets. Get the two bucket locks in the order of
-                 * the bucket indexes (Ordering prevents deadlocks). Check for
-                 * the existence of a winner in the new bucket and for potential
-                 * use of the victim (pinCount != 1). If everything looks good,
-                 * remove the CachedPage from the old bucket, and add it to the
-                 * new bucket and update its header with the new dpid.
+                 * We have a victim with the following invariants.
+                 * 1. The dpid on the CachedPage may or may not be valid.
+                 * 2. We have a pin on the CachedPage. We have to deal with three cases here.
+                 *  Case 1: The dpid on the CachedPage is invalid (-1). This indicates that this buffer has never been used.
+                 *  So we are the only ones holding it. Get a lock on the required dpid's hash bucket, check if someone inserted
+                 *  the page we want into the table. If so, decrement the pincount on the victim and return the winner page in the
+                 *  table. If such a winner does not exist, insert the victim and return it.
+                 *  Case 2: The dpid on the CachedPage is valid.
+                 *      Case 2a: The current dpid and required dpid hash to the same bucket.
+                 *      Get the bucket lock, check that the victim is still at pinCount == 1 If so check if there is a winning
+                 *      CachedPage with the required dpid. If so, decrement the pinCount on the victim and return the winner.
+                 *      If not, update the contents of the CachedPage to hold the required dpid and return it. If the picCount
+                 *      on the victim was != 1 or CachedPage was dirty someone used the victim for its old contents -- Decrement
+                 *      the pinCount and retry.
+                 *  Case 2b: The current dpid and required dpid hash to different buckets. Get the two bucket locks in the order
+                 *  of the bucket indexes (Ordering prevents deadlocks). Check for the existence of a winner in the new bucket
+                 *  and for potential use of the victim (pinCount != 1). If everything looks good, remove the CachedPage from
+                 *  the old bucket, and add it to the new bucket and update its header with the new dpid.
                  */
                 if (victim.dpid < 0) {
                     /*
@@ -313,14 +299,6 @@
                     return victim;
                 }
             }
-            /*
-             * Victimization failed -- all pages pinned? wait a bit, increment
-             * victimizationTryCount and loop around. Give up after
-             * MAX_VICTIMIZATION_TRY_COUNT trys.
-             */
-            if (++victimizationTryCount >= MAX_VICTIMIZATION_TRY_COUNT) {
-                return null;
-            }
             synchronized (cleanerThread) {
                 cleanerThread.notifyAll();
             }
@@ -333,7 +311,7 @@
             }
             synchronized (cleanerThread.cleanNotification) {
                 try {
-                    cleanerThread.cleanNotification.wait(MAX_WAIT_FOR_CLEANER_THREAD_TIME);
+                    cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
                 } catch (InterruptedException e) {
                     // Do nothing
                 }
@@ -658,8 +636,7 @@
                             }
                             fileInfoMap.remove(entryFileId);
                             unreferencedFileFound = true;
-                            // for-each iterator is invalid because we changed
-                            // fileInfoMap
+                            // for-each iterator is invalid because we changed fileInfoMap
                             break;
                         }
                     }
@@ -793,7 +770,7 @@
             } finally {
                 fileMapManager.unregisterFile(fileId);
                 if (fInfo != null) {
-                    // Mark the fInfo as deleted,
+                    // Mark the fInfo as deleted, 
                     // such that when its pages are reclaimed in openFile(),
                     // the pages are not flushed to disk but only invalidates.
                     ioManager.close(fInfo.getFileHandle());