merged hyracks_lsm_tree and fullstack_asterix_stabilization

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3026 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-storage-am-rtree/pom.xml b/hyracks/hyracks-storage-am-rtree/pom.xml
index ade69ef..dce1add 100644
--- a/hyracks/hyracks-storage-am-rtree/pom.xml
+++ b/hyracks/hyracks-storage-am-rtree/pom.xml
@@ -2,7 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-storage-am-rtree</artifactId>
-  <version>0.2.3-SNAPSHOT</version>
   <name>hyracks-storage-am-rtree</name>
 
   <parent>
@@ -31,27 +30,27 @@
   		<version>0.2.3-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
-  	</dependency>  	
+  	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-common</artifactId>
   		<version>0.2.3-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
-  	</dependency>  	
+  	</dependency>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
   		<version>0.2.3-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
-  	</dependency>  	
+  	</dependency>
   	<dependency>
   		<groupId>junit</groupId>
   		<artifactId>junit</artifactId>
   		<version>4.8.1</version>
   		<type>jar</type>
   		<scope>test</scope>
-  	</dependency>  	  		
+  	</dependency>
   </dependencies>
 </project>
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeInteriorFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeInteriorFrame.java
index 5f333f3..59c047c 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeInteriorFrame.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeInteriorFrame.java
@@ -22,9 +22,9 @@
 
 public interface IRTreeInteriorFrame extends IRTreeFrame {
 
-    public boolean findBestChild(ITupleReference tuple, MultiComparator cmp);
+    public int findBestChild(ITupleReference tuple, MultiComparator cmp);
 
-    public int getBestChildPageId();
+    public boolean checkIfEnlarementIsNeeded(ITupleReference tuple, MultiComparator cmp);
 
     public int getChildPageId(int tupleIndex);
 
@@ -36,9 +36,6 @@
 
     public void adjustKey(ITupleReference tuple, int tupleIndex, MultiComparator cmp) throws TreeIndexException;
 
-    public boolean recomputeMBR(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
-
     public void enlarge(ITupleReference tuple, MultiComparator cmp);
 
-    boolean checkEnlargement(ITupleReference tuple, MultiComparator cmp);
 }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeLeafFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeLeafFrame.java
index 3005785..858a40d 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeLeafFrame.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeLeafFrame.java
@@ -20,8 +20,9 @@
 
 public interface IRTreeLeafFrame extends IRTreeFrame {
 
-	public int findTupleIndex(ITupleReference tuple, MultiComparator cmp);
+    public int findTupleIndex(ITupleReference tuple, MultiComparator cmp);
 
-	public boolean intersect(ITupleReference tuple, int tupleIndex,
-			MultiComparator cmp);
+    public boolean intersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
+
+    public ITupleReference getBeforeTuple(ITupleReference tuple, int targetTupleIndex, MultiComparator cmp);
 }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreePolicy.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreePolicy.java
new file mode 100644
index 0000000..a0cc5e8
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreePolicy.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.api;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ISlotManager;
+import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public interface IRTreePolicy {
+    public void split(ITreeIndexFrame leftFrame, ByteBuffer buf, ITreeIndexFrame rightFrame, ISlotManager slotManager,
+            ITreeIndexTupleReference frameTuple, ITupleReference tuple, ISplitKey splitKey);
+
+    public int findBestChildPosition(ITreeIndexFrame frame, ITupleReference tuple, ITreeIndexTupleReference frameTuple,
+            MultiComparator cmp);
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
index 0470da9..5c3b314 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
@@ -19,24 +19,29 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
 
 public class RTreeDataflowHelper extends TreeIndexDataflowHelper {
 
     private final IPrimitiveValueProviderFactory[] valueProviderFactories;
+    private final RTreePolicyType rtreePolicyType;
 
     public RTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
-            IPrimitiveValueProviderFactory[] valueProviderFactories) {
+            IPrimitiveValueProviderFactory[] valueProviderFactories, RTreePolicyType rtreePolicyType) {
         super(opDesc, ctx, partition);
         this.valueProviderFactories = valueProviderFactories;
+        this.rtreePolicyType = rtreePolicyType;
     }
 
     @Override
     public ITreeIndex createIndexInstance() throws HyracksDataException {
-        return RTreeUtils.createRTree(treeOpDesc.getStorageManager().getBufferCache(ctx),
-                treeOpDesc.getTreeIndexTypeTraits(), valueProviderFactories,
-                treeOpDesc.getTreeIndexComparatorFactories());
+        AbstractTreeIndexOperatorDescriptor treeOpDesc = (AbstractTreeIndexOperatorDescriptor) opDesc;
+        return RTreeUtils.createRTree(treeOpDesc.getStorageManager().getBufferCache(ctx), treeOpDesc
+                .getStorageManager().getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(),
+                valueProviderFactories, treeOpDesc.getTreeIndexComparatorFactories(), rtreePolicyType, file);
     }
 }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java
index 6b9fd4c..06af8ee 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java
@@ -20,20 +20,24 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
 
 public class RTreeDataflowHelperFactory implements IIndexDataflowHelperFactory {
 
     private static final long serialVersionUID = 1L;
 
     private final IPrimitiveValueProviderFactory[] valueProviderFactories;
+    private final RTreePolicyType rtreePolicyType;
 
-    public RTreeDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories) {
+    public RTreeDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories,
+            RTreePolicyType rtreePolicyType) {
         this.valueProviderFactories = valueProviderFactories;
+        this.rtreePolicyType = rtreePolicyType;
     }
 
     @Override
     public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition) {
-        return new RTreeDataflowHelper(opDesc, ctx, partition, valueProviderFactories);
+        return new RTreeDataflowHelper(opDesc, ctx, partition, valueProviderFactories, rtreePolicyType);
     }
 }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
index d9b7b97..d718c69 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
@@ -23,12 +23,13 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
 
 public class RTreeSearchOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
 
@@ -37,20 +38,22 @@
     protected int[] keyFields; // fields in input tuple to be used as keys
 
     public RTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
-            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, int[] keyFields,
             IIndexDataflowHelperFactory dataflowHelperFactory, boolean retainInput,
-            IOperationCallbackProvider opCallbackProvider) {
-        super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, dataflowHelperFactory, null, retainInput, opCallbackProvider);
+            ISearchOperationCallbackFactory searchOpCallbackFactory) {
+        super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, dataflowHelperFactory, null, retainInput,
+                NoOpLocalResourceFactoryProvider.INSTANCE, searchOpCallbackFactory,
+                NoOpOperationCallbackFactory.INSTANCE);
+
         this.keyFields = keyFields;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new RTreeSearchOperatorNodePushable(this, ctx, opCallbackProvider, partition, recordDescProvider,
-                keyFields);
+        return new RTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, keyFields);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index 3781037..de4961b 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -17,22 +17,21 @@
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexSearchOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
 import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
 
-public class RTreeSearchOperatorNodePushable extends TreeIndexSearchOperatorNodePushable {
+public class RTreeSearchOperatorNodePushable extends IndexSearchOperatorNodePushable {
     protected PermutingFrameTupleReference searchKey;
     protected MultiComparator cmp;
 
     public RTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            IOperationCallbackProvider opCallbackProvider, int partition, IRecordDescriptorProvider recordDescProvider,
-            int[] keyFields) {
+            int partition, IRecordDescriptorProvider recordDescProvider, int[] keyFields) {
         super(opDesc, ctx, partition, recordDescProvider);
         if (keyFields != null && keyFields.length > 0) {
             searchKey = new PermutingFrameTupleReference();
@@ -42,6 +41,7 @@
 
     @Override
     protected ISearchPredicate createSearchPredicate() {
+        ITreeIndex treeIndex = (ITreeIndex) index;
         cmp = RTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), searchKey);
         return new SearchPredicate(searchKey, cmp);
     }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RStarTreePolicy.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RStarTreePolicy.java
new file mode 100644
index 0000000..aafecd5
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RStarTreePolicy.java
@@ -0,0 +1,354 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ISlotManager;
+import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreePolicy;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.EntriesOrder;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSplitKey;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.Rectangle;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.TupleEntryArrayList;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.UnorderedSlotManager;
+import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
+
+public class RStarTreePolicy implements IRTreePolicy {
+
+    private TupleEntryArrayList tupleEntries1;
+    private TupleEntryArrayList tupleEntries2;
+    private Rectangle[] rec;
+
+    private static final int nearMinimumOverlapFactor = 32;
+    private static final double splitFactor = 0.4;
+    private static final int numTuplesEntries = 100;
+
+    private final ITreeIndexTupleWriter tupleWriter;
+    private final IPrimitiveValueProvider[] keyValueProviders;
+    private ITreeIndexTupleReference cmpFrameTuple;
+    private final int totalFreeSpaceOff;
+
+    public RStarTreePolicy(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders,
+            ITreeIndexTupleReference cmpFrameTuple, int totalFreeSpaceOff) {
+        this.tupleWriter = tupleWriter;
+        this.keyValueProviders = keyValueProviders;
+        this.totalFreeSpaceOff = totalFreeSpaceOff;
+        this.cmpFrameTuple = cmpFrameTuple;
+        tupleEntries1 = new TupleEntryArrayList(numTuplesEntries, numTuplesEntries);
+        tupleEntries2 = new TupleEntryArrayList(numTuplesEntries, numTuplesEntries);
+        rec = new Rectangle[4];
+        for (int i = 0; i < 4; i++) {
+            rec[i] = new Rectangle(keyValueProviders.length / 2);
+        }
+    }
+
+    @Override
+    public void split(ITreeIndexFrame leftFrame, ByteBuffer buf, ITreeIndexFrame rightFrame, ISlotManager slotManager,
+            ITreeIndexTupleReference frameTuple, ITupleReference tuple, ISplitKey splitKey) {
+        RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterleftRTreeFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame = ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
+
+        RTreeNSMFrame leftRTreeFrame = ((RTreeNSMFrame) leftFrame);
+
+        // calculations are based on the R*-tree paper
+        int m = (int) Math.floor((leftRTreeFrame.getTupleCount() + 1) * splitFactor);
+        int splitDistribution = leftRTreeFrame.getTupleCount() - (2 * m) + 2;
+
+        // to calculate the minimum margin in order to pick the split axis
+        double minMargin = Double.MAX_VALUE;
+        int splitAxis = 0, sortOrder = 0;
+
+        int maxFieldPos = keyValueProviders.length / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            for (int k = 0; k < leftRTreeFrame.getTupleCount(); ++k) {
+
+                frameTuple.resetByTupleIndex(leftRTreeFrame, k);
+                double LowerKey = keyValueProviders[i]
+                        .getValue(frameTuple.getFieldData(i), frameTuple.getFieldStart(i));
+                double UpperKey = keyValueProviders[j]
+                        .getValue(frameTuple.getFieldData(j), frameTuple.getFieldStart(j));
+
+                tupleEntries1.add(k, LowerKey);
+                tupleEntries2.add(k, UpperKey);
+            }
+            double LowerKey = keyValueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i));
+            double UpperKey = keyValueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j));
+
+            tupleEntries1.add(-1, LowerKey);
+            tupleEntries2.add(-1, UpperKey);
+
+            tupleEntries1.sort(EntriesOrder.ASCENDING, leftRTreeFrame.getTupleCount() + 1);
+            tupleEntries2.sort(EntriesOrder.ASCENDING, leftRTreeFrame.getTupleCount() + 1);
+
+            double lowerMargin = 0.0, upperMargin = 0.0;
+            // generate distribution
+            for (int k = 1; k <= splitDistribution; ++k) {
+                int d = m - 1 + k;
+
+                generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries1, rec[0], 0, d);
+                generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries2, rec[1], 0, d);
+                generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries1, rec[2], d,
+                        leftRTreeFrame.getTupleCount() + 1);
+                generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries2, rec[3], d,
+                        leftRTreeFrame.getTupleCount() + 1);
+
+                // calculate the margin of the distributions
+                lowerMargin += rec[0].margin() + rec[2].margin();
+                upperMargin += rec[1].margin() + rec[3].margin();
+            }
+            double margin = Math.min(lowerMargin, upperMargin);
+
+            // store minimum margin as split axis
+            if (margin < minMargin) {
+                minMargin = margin;
+                splitAxis = i;
+                sortOrder = (lowerMargin < upperMargin) ? 0 : 2;
+            }
+
+            tupleEntries1.clear();
+            tupleEntries2.clear();
+        }
+
+        for (int i = 0; i < leftRTreeFrame.getTupleCount(); ++i) {
+            frameTuple.resetByTupleIndex(leftRTreeFrame, i);
+            double key = keyValueProviders[splitAxis + sortOrder].getValue(
+                    frameTuple.getFieldData(splitAxis + sortOrder), frameTuple.getFieldStart(splitAxis + sortOrder));
+            tupleEntries1.add(i, key);
+        }
+        double key = keyValueProviders[splitAxis + sortOrder].getValue(tuple.getFieldData(splitAxis + sortOrder),
+                tuple.getFieldStart(splitAxis + sortOrder));
+        tupleEntries1.add(-1, key);
+        tupleEntries1.sort(EntriesOrder.ASCENDING, leftRTreeFrame.getTupleCount() + 1);
+
+        double minArea = Double.MAX_VALUE;
+        double minOverlap = Double.MAX_VALUE;
+        int splitPoint = 0;
+        for (int i = 1; i <= splitDistribution; ++i) {
+            int d = m - 1 + i;
+
+            generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries1, rec[0], 0, d);
+            generateDist(leftRTreeFrame, frameTuple, tuple, tupleEntries1, rec[2], d,
+                    leftRTreeFrame.getTupleCount() + 1);
+
+            double overlap = rec[0].overlappedArea(rec[2]);
+            if (overlap < minOverlap) {
+                splitPoint = d;
+                minOverlap = overlap;
+                minArea = rec[0].area() + rec[2].area();
+            } else if (overlap == minOverlap) {
+                double area = rec[0].area() + rec[2].area();
+                if (area < minArea) {
+                    splitPoint = d;
+                    minArea = area;
+                }
+            }
+        }
+        int startIndex, endIndex;
+        if (splitPoint < (leftRTreeFrame.getTupleCount() + 1) / 2) {
+            startIndex = 0;
+            endIndex = splitPoint;
+        } else {
+            startIndex = splitPoint;
+            endIndex = (leftRTreeFrame.getTupleCount() + 1);
+        }
+        boolean insertedNewTupleInRightFrame = false;
+        int totalBytes = 0, numOfDeletedTuples = 0;
+        for (int i = startIndex; i < endIndex; i++) {
+            if (tupleEntries1.get(i).getTupleIndex() != -1) {
+                frameTuple.resetByTupleIndex(leftRTreeFrame, tupleEntries1.get(i).getTupleIndex());
+                rightFrame.insert(frameTuple, -1);
+                ((UnorderedSlotManager) slotManager).modifySlot(
+                        slotManager.getSlotOff(tupleEntries1.get(i).getTupleIndex()), -1);
+                totalBytes += leftRTreeFrame.getTupleSize(frameTuple);
+                numOfDeletedTuples++;
+            } else {
+                insertedNewTupleInRightFrame = true;
+            }
+        }
+
+        ((UnorderedSlotManager) slotManager).deleteEmptySlots();
+
+        // maintain space information
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + totalBytes
+                + (slotManager.getSlotSize() * numOfDeletedTuples));
+
+        // compact both pages
+        rightFrame.compact();
+        leftRTreeFrame.compact();
+
+        // The assumption here is that the new tuple cannot be larger than page
+        // size, thus it must fit in either pages.
+        if (insertedNewTupleInRightFrame) {
+            if (rightFrame.hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
+                rightFrame.insert(tuple, -1);
+            } else {
+                leftRTreeFrame.insert(tuple, -1);
+            }
+        } else if (leftRTreeFrame.hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
+            leftRTreeFrame.insert(tuple, -1);
+        } else {
+            rightFrame.insert(tuple, -1);
+        }
+
+        int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
+        frameTuple.resetByTupleOffset(buf, tupleOff);
+        int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0, keyValueProviders.length);
+
+        splitKey.initData(splitKeySize);
+        leftRTreeFrame.adjustMBR();
+        rTreeTupleWriterleftRTreeFrame.writeTupleFields(leftRTreeFrame.getTuples(), 0,
+                rTreeSplitKey.getLeftPageBuffer(), 0);
+        rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(), 0);
+
+        ((IRTreeFrame) rightFrame).adjustMBR();
+        rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getTuples(), 0,
+                rTreeSplitKey.getRightPageBuffer(), 0);
+        rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(), 0);
+
+        tupleEntries1.clear();
+        tupleEntries2.clear();
+    }
+
+    public void generateDist(ITreeIndexFrame leftRTreeFrame, ITreeIndexTupleReference frameTuple,
+            ITupleReference tuple, TupleEntryArrayList entries, Rectangle rec, int start, int end) {
+        int j = 0;
+        while (entries.get(j).getTupleIndex() == -1) {
+            j++;
+        }
+        frameTuple.resetByTupleIndex(leftRTreeFrame, entries.get(j).getTupleIndex());
+        rec.set(frameTuple, keyValueProviders);
+        for (int i = start; i < end; ++i) {
+            if (i != j) {
+                if (entries.get(i).getTupleIndex() != -1) {
+                    frameTuple.resetByTupleIndex(leftRTreeFrame, entries.get(i).getTupleIndex());
+                    rec.enlarge(frameTuple, keyValueProviders);
+                } else {
+                    rec.enlarge(tuple, keyValueProviders);
+                }
+            }
+        }
+    }
+
+    @Override
+    public int findBestChildPosition(ITreeIndexFrame frame, ITupleReference tuple, ITreeIndexTupleReference frameTuple,
+            MultiComparator cmp) {
+        cmpFrameTuple.setFieldCount(cmp.getKeyFieldCount());
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+
+        int bestChild = 0;
+        double minEnlargedArea = Double.MAX_VALUE;
+
+        // the children pointers in the node point to leaves
+        if (frame.getLevel() == 1) {
+            // find least overlap enlargement, use minimum enlarged area to
+            // break tie, if tie still exists use minimum area to break it
+            for (int i = 0; i < frame.getTupleCount(); ++i) {
+                frameTuple.resetByTupleIndex(frame, i);
+                double enlargedArea = RTreeComputationUtils.enlargedArea(frameTuple, tuple, cmp, keyValueProviders);
+                tupleEntries1.add(i, enlargedArea);
+                if (enlargedArea < minEnlargedArea) {
+                    minEnlargedArea = enlargedArea;
+                    bestChild = i;
+                }
+            }
+            if (minEnlargedArea < RTreeNSMFrame.doubleEpsilon() || minEnlargedArea > RTreeNSMFrame.doubleEpsilon()) {
+                minEnlargedArea = Double.MAX_VALUE;
+                int k;
+                if (frame.getTupleCount() > nearMinimumOverlapFactor) {
+                    // sort the entries based on their area enlargement needed
+                    // to include the object
+                    tupleEntries1.sort(EntriesOrder.ASCENDING, frame.getTupleCount());
+                    k = nearMinimumOverlapFactor;
+                } else {
+                    k = frame.getTupleCount();
+                }
+
+                double minOverlap = Double.MAX_VALUE;
+                int id = 0;
+                for (int i = 0; i < k; ++i) {
+                    double difference = 0.0;
+                    for (int j = 0; j < frame.getTupleCount(); ++j) {
+                        frameTuple.resetByTupleIndex(frame, j);
+                        cmpFrameTuple.resetByTupleIndex(frame, tupleEntries1.get(i).getTupleIndex());
+
+                        int c = ((RTreeNSMInteriorFrame) frame).pointerCmp(frameTuple, cmpFrameTuple, cmp);
+                        if (c != 0) {
+                            double intersection = RTreeComputationUtils.overlappedArea(frameTuple, tuple,
+                                    cmpFrameTuple, cmp, keyValueProviders);
+                            if (intersection != 0.0) {
+                                difference += intersection
+                                        - RTreeComputationUtils.overlappedArea(frameTuple, null, cmpFrameTuple, cmp,
+                                                keyValueProviders);
+                            }
+                        } else {
+                            id = j;
+                        }
+                    }
+
+                    double enlargedArea = RTreeComputationUtils.enlargedArea(cmpFrameTuple, tuple, cmp,
+                            keyValueProviders);
+                    if (difference < minOverlap) {
+                        minOverlap = difference;
+                        minEnlargedArea = enlargedArea;
+                        bestChild = id;
+                    } else if (difference == minOverlap) {
+                        if (enlargedArea < minEnlargedArea) {
+                            minEnlargedArea = enlargedArea;
+                            bestChild = id;
+                        } else if (enlargedArea == minEnlargedArea) {
+                            double area = RTreeComputationUtils.area(cmpFrameTuple, cmp, keyValueProviders);
+                            frameTuple.resetByTupleIndex(frame, bestChild);
+                            double minArea = RTreeComputationUtils.area(frameTuple, cmp, keyValueProviders);
+                            if (area < minArea) {
+                                bestChild = id;
+                            }
+                        }
+                    }
+                }
+            }
+        } else { // find minimum enlarged area, use minimum area to break tie
+            for (int i = 0; i < frame.getTupleCount(); i++) {
+                frameTuple.resetByTupleIndex(frame, i);
+                double enlargedArea = RTreeComputationUtils.enlargedArea(frameTuple, tuple, cmp, keyValueProviders);
+                if (enlargedArea < minEnlargedArea) {
+                    minEnlargedArea = enlargedArea;
+                    bestChild = i;
+                } else if (enlargedArea == minEnlargedArea) {
+                    double area = RTreeComputationUtils.area(frameTuple, cmp, keyValueProviders);
+                    frameTuple.resetByTupleIndex(frame, bestChild);
+                    double minArea = RTreeComputationUtils.area(frameTuple, cmp, keyValueProviders);
+                    if (area < minArea) {
+                        bestChild = i;
+                    }
+                }
+            }
+        }
+        tupleEntries1.clear();
+
+        return bestChild;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeComputationUtils.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeComputationUtils.java
new file mode 100644
index 0000000..f0122b3
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeComputationUtils.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public class RTreeComputationUtils {
+
+    public static double enlargedArea(ITupleReference tuple, ITupleReference tupleToBeInserted, MultiComparator cmp,
+            IPrimitiveValueProvider[] keyValueProviders) {
+        double areaBeforeEnlarge = RTreeComputationUtils.area(tuple, cmp, keyValueProviders);
+        double areaAfterEnlarge = 1.0;
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            double pHigh, pLow;
+            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i), tupleToBeInserted.getFieldData(i), tupleToBeInserted.getFieldStart(i),
+                    tupleToBeInserted.getFieldLength(i));
+            if (c < 0) {
+                pLow = keyValueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i));
+            } else {
+                pLow = keyValueProviders[i].getValue(tupleToBeInserted.getFieldData(i),
+                        tupleToBeInserted.getFieldStart(i));
+            }
+
+            c = cmp.getComparators()[j].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
+                    tupleToBeInserted.getFieldData(j), tupleToBeInserted.getFieldStart(j),
+                    tupleToBeInserted.getFieldLength(j));
+            if (c > 0) {
+                pHigh = keyValueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j));
+            } else {
+                pHigh = keyValueProviders[j].getValue(tupleToBeInserted.getFieldData(j),
+                        tupleToBeInserted.getFieldStart(j));
+            }
+            areaAfterEnlarge *= pHigh - pLow;
+        }
+        return areaAfterEnlarge - areaBeforeEnlarge;
+    }
+
+    public static double overlappedArea(ITupleReference tuple1, ITupleReference tupleToBeInserted,
+            ITupleReference tuple2, MultiComparator cmp, IPrimitiveValueProvider[] keyValueProviders) {
+        double area = 1.0;
+        double f1, f2;
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            double pHigh1, pLow1;
+            if (tupleToBeInserted != null) {
+                int c = cmp.getComparators()[i].compare(tuple1.getFieldData(i), tuple1.getFieldStart(i),
+                        tuple1.getFieldLength(i), tupleToBeInserted.getFieldData(i),
+                        tupleToBeInserted.getFieldStart(i), tupleToBeInserted.getFieldLength(i));
+                if (c < 0) {
+                    pLow1 = keyValueProviders[i].getValue(tuple1.getFieldData(i), tuple1.getFieldStart(i));
+                } else {
+                    pLow1 = keyValueProviders[i].getValue(tupleToBeInserted.getFieldData(i),
+                            tupleToBeInserted.getFieldStart(i));
+                }
+
+                c = cmp.getComparators()[j].compare(tuple1.getFieldData(j), tuple1.getFieldStart(j),
+                        tuple1.getFieldLength(j), tupleToBeInserted.getFieldData(j),
+                        tupleToBeInserted.getFieldStart(j), tupleToBeInserted.getFieldLength(j));
+                if (c > 0) {
+                    pHigh1 = keyValueProviders[j].getValue(tuple1.getFieldData(j), tuple1.getFieldStart(j));
+                } else {
+                    pHigh1 = keyValueProviders[j].getValue(tupleToBeInserted.getFieldData(j),
+                            tupleToBeInserted.getFieldStart(j));
+                }
+            } else {
+                pLow1 = keyValueProviders[i].getValue(tuple1.getFieldData(i), tuple1.getFieldStart(i));
+                pHigh1 = keyValueProviders[j].getValue(tuple1.getFieldData(j), tuple1.getFieldStart(j));
+            }
+
+            double pLow2 = keyValueProviders[i].getValue(tuple2.getFieldData(i), tuple2.getFieldStart(i));
+            double pHigh2 = keyValueProviders[j].getValue(tuple2.getFieldData(j), tuple2.getFieldStart(j));
+
+            if (pLow1 > pHigh2 || pHigh1 < pLow2) {
+                return 0.0;
+            }
+
+            f1 = Math.max(pLow1, pLow2);
+            f2 = Math.min(pHigh1, pHigh2);
+            area *= f2 - f1;
+        }
+        return area;
+    }
+
+    public static double area(ITupleReference tuple, MultiComparator cmp, IPrimitiveValueProvider[] keyValueProviders) {
+        double area = 1.0;
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            area *= keyValueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j))
+                    - keyValueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i));
+        }
+        return area;
+    }
+
+    public static boolean containsRegion(ITupleReference tuple1, ITupleReference tuple2, MultiComparator cmp,
+            IPrimitiveValueProvider[] keyValueProviders) {
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i]
+                    .compare(tuple1.getFieldData(i), tuple1.getFieldStart(i), tuple1.getFieldLength(i),
+                            tuple2.getFieldData(i), tuple2.getFieldStart(i), tuple2.getFieldLength(i));
+            if (c > 0) {
+                return false;
+            }
+
+            c = cmp.getComparators()[j]
+                    .compare(tuple1.getFieldData(j), tuple1.getFieldStart(j), tuple1.getFieldLength(j),
+                            tuple2.getFieldData(j), tuple2.getFieldStart(j), tuple2.getFieldLength(j));
+            if (c < 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java
index 84e66ef..eeada0a 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java
@@ -21,15 +21,11 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
-import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.common.frames.TreeIndexNSMFrame;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
-import edu.uci.ics.hyracks.storage.am.rtree.impls.EntriesOrder;
-import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSplitKey;
-import edu.uci.ics.hyracks.storage.am.rtree.impls.Rectangle;
-import edu.uci.ics.hyracks.storage.am.rtree.impls.TupleEntryArrayList;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreePolicy;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.UnorderedSlotManager;
-import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
 
 public abstract class RTreeNSMFrame extends TreeIndexNSMFrame implements IRTreeFrame {
     protected static final int pageNsnOff = smFlagOff + 1;
@@ -37,33 +33,27 @@
 
     protected ITreeIndexTupleReference[] tuples;
     protected ITreeIndexTupleReference cmpFrameTuple;
-    protected TupleEntryArrayList tupleEntries1; // used for split and checking
-                                                 // enlargement
-    protected TupleEntryArrayList tupleEntries2; // used for split
 
-    protected Rectangle[] rec;
-
-    protected static final double splitFactor = 0.4;
-    protected static final int nearMinimumOverlapFactor = 32;
     private static final double doubleEpsilon = computeDoubleEpsilon();
-    private static final int numTuplesEntries = 100;
     protected final IPrimitiveValueProvider[] keyValueProviders;
 
-    public RTreeNSMFrame(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders) {
+    protected IRTreePolicy rtreePolicy;
+
+    public RTreeNSMFrame(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders,
+            RTreePolicyType rtreePolicyType) {
         super(tupleWriter, new UnorderedSlotManager());
         this.tuples = new ITreeIndexTupleReference[keyValueProviders.length];
         for (int i = 0; i < keyValueProviders.length; i++) {
             this.tuples[i] = tupleWriter.createTupleReference();
         }
         cmpFrameTuple = tupleWriter.createTupleReference();
-
-        tupleEntries1 = new TupleEntryArrayList(numTuplesEntries, numTuplesEntries);
-        tupleEntries2 = new TupleEntryArrayList(numTuplesEntries, numTuplesEntries);
-        rec = new Rectangle[4];
-        for (int i = 0; i < 4; i++) {
-            rec[i] = new Rectangle(keyValueProviders.length / 2);
-        }
         this.keyValueProviders = keyValueProviders;
+
+        if (rtreePolicyType == RTreePolicyType.RTREE) {
+            rtreePolicy = new RTreePolicy(tupleWriter, keyValueProviders, cmpFrameTuple, totalFreeSpaceOff);
+        } else {
+            rtreePolicy = new RStarTreePolicy(tupleWriter, keyValueProviders, cmpFrameTuple, totalFreeSpaceOff);
+        }
     }
 
     private static double computeDoubleEpsilon() {
@@ -116,184 +106,17 @@
         buf.putInt(rightPageOff, rightPage);
     }
 
-    protected ITreeIndexTupleReference[] getTuples() {
+    public ITreeIndexTupleReference[] getTuples() {
         return tuples;
     }
 
     @Override
-    public void split(ITreeIndexFrame rightFrame, ITupleReference tuple, ISplitKey splitKey) throws TreeIndexException {
-        RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
-        RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
-        RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame = ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
-
-        // calculations are based on the R*-tree paper
-        int m = (int) Math.floor((getTupleCount() + 1) * splitFactor);
-        int splitDistribution = getTupleCount() - (2 * m) + 2;
-
-        // to calculate the minimum margin in order to pick the split axis
-        double minMargin = Double.MAX_VALUE;
-        int splitAxis = 0, sortOrder = 0;
-
-        int maxFieldPos = keyValueProviders.length / 2;
-        for (int i = 0; i < maxFieldPos; i++) {
-            int j = maxFieldPos + i;
-            for (int k = 0; k < getTupleCount(); ++k) {
-
-                frameTuple.resetByTupleIndex(this, k);
-                double LowerKey = keyValueProviders[i]
-                        .getValue(frameTuple.getFieldData(i), frameTuple.getFieldStart(i));
-                double UpperKey = keyValueProviders[j]
-                        .getValue(frameTuple.getFieldData(j), frameTuple.getFieldStart(j));
-
-                tupleEntries1.add(k, LowerKey);
-                tupleEntries2.add(k, UpperKey);
-            }
-            double LowerKey = keyValueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i));
-            double UpperKey = keyValueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j));
-
-            tupleEntries1.add(-1, LowerKey);
-            tupleEntries2.add(-1, UpperKey);
-
-            tupleEntries1.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
-            tupleEntries2.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
-
-            double lowerMargin = 0.0, upperMargin = 0.0;
-            // generate distribution
-            for (int k = 1; k <= splitDistribution; ++k) {
-                int d = m - 1 + k;
-
-                generateDist(tuple, tupleEntries1, rec[0], 0, d);
-                generateDist(tuple, tupleEntries2, rec[1], 0, d);
-                generateDist(tuple, tupleEntries1, rec[2], d, getTupleCount() + 1);
-                generateDist(tuple, tupleEntries2, rec[3], d, getTupleCount() + 1);
-
-                // calculate the margin of the distributions
-                lowerMargin += rec[0].margin() + rec[2].margin();
-                upperMargin += rec[1].margin() + rec[3].margin();
-            }
-            double margin = Math.min(lowerMargin, upperMargin);
-
-            // store minimum margin as split axis
-            if (margin < minMargin) {
-                minMargin = margin;
-                splitAxis = i;
-                sortOrder = (lowerMargin < upperMargin) ? 0 : 2;
-            }
-
-            tupleEntries1.clear();
-            tupleEntries2.clear();
-        }
-
-        for (int i = 0; i < getTupleCount(); ++i) {
-            frameTuple.resetByTupleIndex(this, i);
-            double key = keyValueProviders[splitAxis + sortOrder].getValue(
-                    frameTuple.getFieldData(splitAxis + sortOrder), frameTuple.getFieldStart(splitAxis + sortOrder));
-            tupleEntries1.add(i, key);
-        }
-        double key = keyValueProviders[splitAxis + sortOrder].getValue(tuple.getFieldData(splitAxis + sortOrder),
-                tuple.getFieldStart(splitAxis + sortOrder));
-        tupleEntries1.add(-1, key);
-        tupleEntries1.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
-
-        double minArea = Double.MAX_VALUE;
-        double minOverlap = Double.MAX_VALUE;
-        int splitPoint = 0;
-        for (int i = 1; i <= splitDistribution; ++i) {
-            int d = m - 1 + i;
-
-            generateDist(tuple, tupleEntries1, rec[0], 0, d);
-            generateDist(tuple, tupleEntries1, rec[2], d, getTupleCount() + 1);
-
-            double overlap = rec[0].overlappedArea(rec[2]);
-            if (overlap < minOverlap) {
-                splitPoint = d;
-                minOverlap = overlap;
-                minArea = rec[0].area() + rec[2].area();
-            } else if (overlap == minOverlap) {
-                double area = rec[0].area() + rec[2].area();
-                if (area < minArea) {
-                    splitPoint = d;
-                    minArea = area;
-                }
-            }
-        }
-        int startIndex, endIndex;
-        if (splitPoint < (getTupleCount() + 1) / 2) {
-            startIndex = 0;
-            endIndex = splitPoint;
-        } else {
-            startIndex = splitPoint;
-            endIndex = (getTupleCount() + 1);
-        }
-        boolean tupleInserted = false;
-        int totalBytes = 0, numOfDeletedTuples = 0;
-        for (int i = startIndex; i < endIndex; i++) {
-            if (tupleEntries1.get(i).getTupleIndex() != -1) {
-                frameTuple.resetByTupleIndex(this, tupleEntries1.get(i).getTupleIndex());
-                rightFrame.insert(frameTuple, -1);
-                ((UnorderedSlotManager) slotManager).modifySlot(
-                        slotManager.getSlotOff(tupleEntries1.get(i).getTupleIndex()), -1);
-                totalBytes += getTupleSize(frameTuple);
-                numOfDeletedTuples++;
-            } else {
-                rightFrame.insert(tuple, -1);
-                tupleInserted = true;
-            }
-        }
-
-        ((UnorderedSlotManager) slotManager).deleteEmptySlots();
-
-        // maintain space information
-        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + totalBytes
-                + (slotManager.getSlotSize() * numOfDeletedTuples));
-
-        // compact both pages
-        rightFrame.compact();
-        compact();
-
-        if (!tupleInserted) {
-            insert(tuple, -1);
-        }
-
-        int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
-        frameTuple.resetByTupleOffset(buf, tupleOff);
-        int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0, keyValueProviders.length);
-
-        splitKey.initData(splitKeySize);
-        this.adjustMBR();
-        rTreeTupleWriterLeftFrame.writeTupleFields(getTuples(), 0, rTreeSplitKey.getLeftPageBuffer(), 0);
-        rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(), 0);
-
-        ((IRTreeFrame) rightFrame).adjustMBR();
-        rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getTuples(), 0,
-                rTreeSplitKey.getRightPageBuffer(), 0);
-        rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(), 0);
-
-        tupleEntries1.clear();
-        tupleEntries2.clear();
+    public void split(ITreeIndexFrame rightFrame, ITupleReference tuple, ISplitKey splitKey) {
+        rtreePolicy.split(this, buf, rightFrame, slotManager, frameTuple, tuple, splitKey);
     }
 
     abstract public int getTupleSize(ITupleReference tuple);
 
-    public void generateDist(ITupleReference tuple, TupleEntryArrayList entries, Rectangle rec, int start, int end) {
-        int j = 0;
-        while (entries.get(j).getTupleIndex() == -1) {
-            j++;
-        }
-        frameTuple.resetByTupleIndex(this, entries.get(j).getTupleIndex());
-        rec.set(frameTuple, keyValueProviders);
-        for (int i = start; i < end; ++i) {
-            if (i != j) {
-                if (entries.get(i).getTupleIndex() != -1) {
-                    frameTuple.resetByTupleIndex(this, entries.get(i).getTupleIndex());
-                    rec.enlarge(frameTuple, keyValueProviders);
-                } else {
-                    rec.enlarge(tuple, keyValueProviders);
-                }
-            }
-        }
-    }
-
     public void adjustMBRImpl(ITreeIndexTupleReference[] tuples) {
         int maxFieldPos = keyValueProviders.length / 2;
         for (int i = 1; i < getTupleCount(); i++) {
@@ -328,6 +151,11 @@
 
     @Override
     public int getPageHeaderSize() {
-        return rightPageOff;
+        return rightPageOff + 4;
+    }
+
+    @Override
+    public void setMultiComparator(MultiComparator cmp) {
+        // currently, R-Tree Frames are unsorted
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
index 63387ef..5ab9632 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
@@ -31,7 +31,6 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.SlotOffTupleOff;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
-import edu.uci.ics.hyracks.storage.am.rtree.impls.EntriesOrder;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.PathList;
 
 public class RTreeNSMInteriorFrame extends RTreeNSMFrame implements IRTreeInteriorFrame {
@@ -41,109 +40,24 @@
             .createBinaryComparator();
     private final int keyFieldCount;
 
-    public RTreeNSMInteriorFrame(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders) {
-        super(tupleWriter, keyValueProviders);
+    public RTreeNSMInteriorFrame(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders,
+            RTreePolicyType rtreePolicyType) {
+        super(tupleWriter, keyValueProviders, rtreePolicyType);
         keyFieldCount = keyValueProviders.length;
         frameTuple.setFieldCount(keyFieldCount);
     }
 
     @Override
-    public boolean findBestChild(ITupleReference tuple, MultiComparator cmp) {
-        cmpFrameTuple.setFieldCount(cmp.getKeyFieldCount());
-        frameTuple.setFieldCount(cmp.getKeyFieldCount());
-
-        int bestChild = 0;
-        double minEnlargedArea = Double.MAX_VALUE;
-
-        // the children pointers in the node point to leaves
-        if (getLevel() == 1) {
-            // find least overlap enlargement, use minimum enlarged area to
-            // break tie, if tie still exists use minimum area to break it
-            for (int i = 0; i < getTupleCount(); ++i) {
-                frameTuple.resetByTupleIndex(this, i);
-                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
-                tupleEntries1.add(i, enlargedArea);
-                if (enlargedArea < minEnlargedArea) {
-                    minEnlargedArea = enlargedArea;
-                    bestChild = i;
-                }
-            }
-            if (minEnlargedArea < RTreeNSMFrame.doubleEpsilon() || minEnlargedArea > RTreeNSMFrame.doubleEpsilon()) {
-                minEnlargedArea = Double.MAX_VALUE;
-                int k;
-                if (getTupleCount() > nearMinimumOverlapFactor) {
-                    // sort the entries based on their area enlargement needed
-                    // to include the object
-                    tupleEntries1.sort(EntriesOrder.ASCENDING, getTupleCount());
-                    k = nearMinimumOverlapFactor;
-                } else {
-                    k = getTupleCount();
-                }
-
-                double minOverlap = Double.MAX_VALUE;
-                int id = 0;
-                for (int i = 0; i < k; ++i) {
-                    double difference = 0.0;
-                    for (int j = 0; j < getTupleCount(); ++j) {
-                        frameTuple.resetByTupleIndex(this, j);
-                        cmpFrameTuple.resetByTupleIndex(this, tupleEntries1.get(i).getTupleIndex());
-
-                        int c = pointerCmp(frameTuple, cmpFrameTuple, cmp);
-                        if (c != 0) {
-                            double intersection = overlappedArea(frameTuple, tuple, cmpFrameTuple, cmp);
-                            if (intersection != 0.0) {
-                                difference += intersection - overlappedArea(frameTuple, null, cmpFrameTuple, cmp);
-                            }
-                        } else {
-                            id = j;
-                        }
-                    }
-
-                    double enlargedArea = enlargedArea(cmpFrameTuple, tuple, cmp);
-                    if (difference < minOverlap) {
-                        minOverlap = difference;
-                        minEnlargedArea = enlargedArea;
-                        bestChild = id;
-                    } else if (difference == minOverlap) {
-                        if (enlargedArea < minEnlargedArea) {
-                            minEnlargedArea = enlargedArea;
-                            bestChild = id;
-                        } else if (enlargedArea == minEnlargedArea) {
-                            double area = area(cmpFrameTuple, cmp);
-                            frameTuple.resetByTupleIndex(this, bestChild);
-                            double minArea = area(frameTuple, cmp);
-                            if (area < minArea) {
-                                bestChild = id;
-                            }
-                        }
-                    }
-                }
-            }
-        } else { // find minimum enlarged area, use minimum area to break tie
-            for (int i = 0; i < getTupleCount(); i++) {
-                frameTuple.resetByTupleIndex(this, i);
-                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
-                if (enlargedArea < minEnlargedArea) {
-                    minEnlargedArea = enlargedArea;
-                    bestChild = i;
-                } else if (enlargedArea == minEnlargedArea) {
-                    double area = area(frameTuple, cmp);
-                    frameTuple.resetByTupleIndex(this, bestChild);
-                    double minArea = area(frameTuple, cmp);
-                    if (area < minArea) {
-                        bestChild = i;
-                    }
-                }
-            }
-        }
-        tupleEntries1.clear();
-
+    public int findBestChild(ITupleReference tuple, MultiComparator cmp) {
+        int bestChild = rtreePolicy.findBestChildPosition(this, tuple, frameTuple, cmp);
         frameTuple.resetByTupleIndex(this, bestChild);
-        if (minEnlargedArea > 0.0) {
-            return true;
-        } else {
-            return false;
-        }
+        return buf.getInt(getChildPointerOff(frameTuple));
+    }
+
+    // frameTuple is assumed to have the tuple to be tested against.
+    @Override
+    public boolean checkIfEnlarementIsNeeded(ITupleReference tuple, MultiComparator cmp) {
+        return !RTreeComputationUtils.containsRegion(frameTuple, tuple, cmp, keyValueProviders);
     }
 
     @Override
@@ -154,11 +68,6 @@
     }
 
     @Override
-    public int getBestChildPageId() {
-        return buf.getInt(getChildPointerOff(frameTuple));
-    }
-
-    @Override
     public int findTupleByPointer(ITupleReference tuple, MultiComparator cmp) {
         frameTuple.setFieldCount(cmp.getKeyFieldCount());
         for (int i = 0; i < getTupleCount(); i++) {
@@ -279,7 +188,7 @@
 
     }
 
-    private int pointerCmp(ITupleReference tupleA, ITupleReference tupleB, MultiComparator cmp) {
+    protected int pointerCmp(ITupleReference tupleA, ITupleReference tupleB, MultiComparator cmp) {
         return childPtrCmp
                 .compare(tupleA.getFieldData(cmp.getKeyFieldCount() - 1), getChildPointerOff(tupleA), childPtrSize,
                         tupleB.getFieldData(cmp.getKeyFieldCount() - 1), getChildPointerOff(tupleB), childPtrSize);
@@ -330,143 +239,6 @@
     }
 
     @Override
-    public boolean recomputeMBR(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
-        frameTuple.setFieldCount(cmp.getKeyFieldCount());
-        frameTuple.resetByTupleIndex(this, tupleIndex);
-
-        int maxFieldPos = cmp.getKeyFieldCount() / 2;
-        for (int i = 0; i < maxFieldPos; i++) {
-            int j = maxFieldPos + i;
-            int c = cmp.getComparators()[i].compare(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
-                    frameTuple.getFieldLength(i), tuple.getFieldData(i), tuple.getFieldStart(i),
-                    tuple.getFieldLength(i));
-            if (c != 0) {
-                return true;
-            }
-            c = cmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
-                    frameTuple.getFieldLength(j), tuple.getFieldData(j), tuple.getFieldStart(j),
-                    tuple.getFieldLength(j));
-
-            if (c != 0) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private double overlappedArea(ITupleReference tuple1, ITupleReference tupleToBeInserted, ITupleReference tuple2,
-            MultiComparator cmp) {
-        double area = 1.0;
-        double f1, f2;
-
-        int maxFieldPos = cmp.getKeyFieldCount() / 2;
-        for (int i = 0; i < maxFieldPos; i++) {
-            int j = maxFieldPos + i;
-            double pHigh1, pLow1;
-            if (tupleToBeInserted != null) {
-                int c = cmp.getComparators()[i].compare(tuple1.getFieldData(i), tuple1.getFieldStart(i),
-                        tuple1.getFieldLength(i), tupleToBeInserted.getFieldData(i),
-                        tupleToBeInserted.getFieldStart(i), tupleToBeInserted.getFieldLength(i));
-                if (c < 0) {
-                    pLow1 = keyValueProviders[i].getValue(tuple1.getFieldData(i), tuple1.getFieldStart(i));
-                } else {
-                    pLow1 = keyValueProviders[i].getValue(tupleToBeInserted.getFieldData(i),
-                            tupleToBeInserted.getFieldStart(i));
-                }
-
-                c = cmp.getComparators()[j].compare(tuple1.getFieldData(j), tuple1.getFieldStart(j),
-                        tuple1.getFieldLength(j), tupleToBeInserted.getFieldData(j),
-                        tupleToBeInserted.getFieldStart(j), tupleToBeInserted.getFieldLength(j));
-                if (c > 0) {
-                    pHigh1 = keyValueProviders[j].getValue(tuple1.getFieldData(j), tuple1.getFieldStart(j));
-                } else {
-                    pHigh1 = keyValueProviders[j].getValue(tupleToBeInserted.getFieldData(j),
-                            tupleToBeInserted.getFieldStart(j));
-                }
-            } else {
-                pLow1 = keyValueProviders[i].getValue(tuple1.getFieldData(i), tuple1.getFieldStart(i));
-                pHigh1 = keyValueProviders[j].getValue(tuple1.getFieldData(j), tuple1.getFieldStart(j));
-            }
-
-            double pLow2 = keyValueProviders[i].getValue(tuple2.getFieldData(i), tuple2.getFieldStart(i));
-            double pHigh2 = keyValueProviders[j].getValue(tuple2.getFieldData(j), tuple2.getFieldStart(j));
-
-            if (pLow1 > pHigh2 || pHigh1 < pLow2) {
-                return 0.0;
-            }
-
-            f1 = Math.max(pLow1, pLow2);
-            f2 = Math.min(pHigh1, pHigh2);
-            area *= f2 - f1;
-        }
-        return area;
-    }
-
-    private double enlargedArea(ITupleReference tuple, ITupleReference tupleToBeInserted, MultiComparator cmp) {
-        double areaBeforeEnlarge = area(tuple, cmp);
-        double areaAfterEnlarge = 1.0;
-
-        int maxFieldPos = cmp.getKeyFieldCount() / 2;
-        for (int i = 0; i < maxFieldPos; i++) {
-            int j = maxFieldPos + i;
-            double pHigh, pLow;
-            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
-                    tuple.getFieldLength(i), tupleToBeInserted.getFieldData(i), tupleToBeInserted.getFieldStart(i),
-                    tupleToBeInserted.getFieldLength(i));
-            if (c < 0) {
-                pLow = keyValueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i));
-            } else {
-                pLow = keyValueProviders[i].getValue(tupleToBeInserted.getFieldData(i),
-                        tupleToBeInserted.getFieldStart(i));
-            }
-
-            c = cmp.getComparators()[j].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
-                    tupleToBeInserted.getFieldData(j), tupleToBeInserted.getFieldStart(j),
-                    tupleToBeInserted.getFieldLength(j));
-            if (c > 0) {
-                pHigh = keyValueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j));
-            } else {
-                pHigh = keyValueProviders[j].getValue(tupleToBeInserted.getFieldData(j),
-                        tupleToBeInserted.getFieldStart(j));
-            }
-            areaAfterEnlarge *= pHigh - pLow;
-        }
-        return areaAfterEnlarge - areaBeforeEnlarge;
-    }
-
-    private double area(ITupleReference tuple, MultiComparator cmp) {
-        double area = 1.0;
-        int maxFieldPos = cmp.getKeyFieldCount() / 2;
-        for (int i = 0; i < maxFieldPos; i++) {
-            int j = maxFieldPos + i;
-            area *= keyValueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j))
-                    - keyValueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i));
-        }
-        return area;
-    }
-
-    @Override
-    public boolean checkEnlargement(ITupleReference tuple, MultiComparator cmp) {
-        int maxFieldPos = cmp.getKeyFieldCount() / 2;
-        for (int i = 0; i < maxFieldPos; i++) {
-            int j = maxFieldPos + i;
-            int c = cmp.getComparators()[i].compare(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
-                    frameTuple.getFieldLength(i), tuple.getFieldData(i), tuple.getFieldStart(i),
-                    tuple.getFieldLength(i));
-            if (c > 0) {
-                return true;
-            }
-            c = cmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
-                    frameTuple.getFieldLength(j), tuple.getFieldData(j), tuple.getFieldStart(j),
-                    tuple.getFieldLength(j));
-            if (c < 0) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    @Override
     public void enlarge(ITupleReference tuple, MultiComparator cmp) {
         int maxFieldPos = cmp.getKeyFieldCount() / 2;
         for (int i = 0; i < maxFieldPos; i++) {
@@ -509,4 +281,8 @@
     public int getFieldCount() {
         return keyValueProviders.length;
     }
+
+    public int getChildPointerSize() {
+        return childPtrSize;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrameFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrameFactory.java
index 943a179..fdb0e0a 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrameFactory.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrameFactory.java
@@ -26,13 +26,16 @@
     private static final long serialVersionUID = 1L;
     private final ITreeIndexTupleWriterFactory tupleWriterFactory;
     private final IPrimitiveValueProviderFactory[] keyValueProviderFactories;
+    private final RTreePolicyType rtreePolicyType;
 
-    public RTreeNSMInteriorFrameFactory(ITreeIndexTupleWriterFactory tupleWriterFactory, IPrimitiveValueProviderFactory[] keyValueProviderFactories) {
+    public RTreeNSMInteriorFrameFactory(ITreeIndexTupleWriterFactory tupleWriterFactory,
+            IPrimitiveValueProviderFactory[] keyValueProviderFactories, RTreePolicyType rtreePolicyType) {
         this.tupleWriterFactory = tupleWriterFactory;
         if (keyValueProviderFactories.length % 2 != 0) {
             throw new IllegalArgumentException("The key has different number of dimensions.");
         }
         this.keyValueProviderFactories = keyValueProviderFactories;
+        this.rtreePolicyType = rtreePolicyType;
     }
 
     @Override
@@ -41,11 +44,11 @@
         for (int i = 0; i < keyValueProviders.length; i++) {
             keyValueProviders[i] = keyValueProviderFactories[i].createPrimitiveValueProvider();
         }
-        return new RTreeNSMInteriorFrame(tupleWriterFactory.createTupleWriter(), keyValueProviders);
+        return new RTreeNSMInteriorFrame(tupleWriterFactory.createTupleWriter(), keyValueProviders, rtreePolicyType);
     }
 
-	@Override
-	public ITreeIndexTupleWriterFactory getTupleWriterFactory() {
-		return tupleWriterFactory;
-	}
+    @Override
+    public ITreeIndexTupleWriterFactory getTupleWriterFactory() {
+        return tupleWriterFactory;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
index f1d71ff..d52ef16 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
@@ -24,8 +24,9 @@
 
 public class RTreeNSMLeafFrame extends RTreeNSMFrame implements IRTreeLeafFrame {
 
-    public RTreeNSMLeafFrame(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders) {
-        super(tupleWriter, keyValueProviders);
+    public RTreeNSMLeafFrame(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders,
+            RTreePolicyType rtreePolicyType) {
+        super(tupleWriter, keyValueProviders, rtreePolicyType);
     }
 
     @Override
@@ -96,4 +97,19 @@
     public int getFieldCount() {
         return frameTuple.getFieldCount();
     }
+
+    public ITupleReference getBeforeTuple(ITupleReference tuple, int targetTupleIndex, MultiComparator cmp) {
+        // Examine the tuple index to determine whether it is valid or not.
+        if (targetTupleIndex != slotManager.getGreatestKeyIndicator()) {
+            // We need to check the key to determine whether it's an insert or an update.
+            frameTuple.resetByTupleIndex(this, targetTupleIndex);
+            if (cmp.compare(tuple, frameTuple) == 0) {
+                // The keys match, it's an update.
+                return frameTuple;
+            }
+        }
+        // Either the tuple index is a special indicator, or the keys don't match.
+        // In those cases, we are definitely dealing with an insert.
+        return null;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrameFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrameFactory.java
index e31148f..b4d382b 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrameFactory.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrameFactory.java
@@ -26,13 +26,16 @@
     private static final long serialVersionUID = 1L;
     private final ITreeIndexTupleWriterFactory tupleWriterFactory;
     private final IPrimitiveValueProviderFactory[] keyValueProviderFactories;
+    private final RTreePolicyType rtreePolicyType;
 
-    public RTreeNSMLeafFrameFactory(ITreeIndexTupleWriterFactory tupleWriterFactory, IPrimitiveValueProviderFactory[] keyValueProviderFactories) {
+    public RTreeNSMLeafFrameFactory(ITreeIndexTupleWriterFactory tupleWriterFactory,
+            IPrimitiveValueProviderFactory[] keyValueProviderFactories, RTreePolicyType rtreePolicyType) {
         this.tupleWriterFactory = tupleWriterFactory;
         if (keyValueProviderFactories.length % 2 != 0) {
             throw new IllegalArgumentException("The key has different number of dimensions.");
         }
         this.keyValueProviderFactories = keyValueProviderFactories;
+        this.rtreePolicyType = rtreePolicyType;
     }
 
     @Override
@@ -41,11 +44,11 @@
         for (int i = 0; i < keyValueProviders.length; i++) {
             keyValueProviders[i] = keyValueProviderFactories[i].createPrimitiveValueProvider();
         }
-        return new RTreeNSMLeafFrame(tupleWriterFactory.createTupleWriter(), keyValueProviders);
+        return new RTreeNSMLeafFrame(tupleWriterFactory.createTupleWriter(), keyValueProviders, rtreePolicyType);
     }
 
-	@Override
-	public ITreeIndexTupleWriterFactory getTupleWriterFactory() {
-		return tupleWriterFactory;
-	}
+    @Override
+    public ITreeIndexTupleWriterFactory getTupleWriterFactory() {
+        return tupleWriterFactory;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreePolicy.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreePolicy.java
new file mode 100644
index 0000000..9d94794
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreePolicy.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ISlotManager;
+import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreePolicy;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSplitKey;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.Rectangle;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.UnorderedSlotManager;
+import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
+
+public class RTreePolicy implements IRTreePolicy {
+
+    private Rectangle[] rec;
+
+    private final ITreeIndexTupleWriter tupleWriter;
+    private final IPrimitiveValueProvider[] keyValueProviders;
+    private ITreeIndexTupleReference cmpFrameTuple;
+    private final int totalFreeSpaceOff;
+
+    public RTreePolicy(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders,
+            ITreeIndexTupleReference cmpFrameTuple, int totalFreeSpaceOff) {
+        this.tupleWriter = tupleWriter;
+        this.keyValueProviders = keyValueProviders;
+        this.cmpFrameTuple = cmpFrameTuple;
+        this.totalFreeSpaceOff = totalFreeSpaceOff;
+
+        rec = new Rectangle[2];
+        for (int i = 0; i < 2; i++) {
+            rec[i] = new Rectangle(keyValueProviders.length / 2);
+        }
+    }
+
+    @Override
+    public void split(ITreeIndexFrame leftFrame, ByteBuffer buf, ITreeIndexFrame rightFrame, ISlotManager slotManager,
+            ITreeIndexTupleReference frameTuple, ITupleReference tuple, ISplitKey splitKey) {
+        RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame = ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
+
+        RTreeNSMFrame leftRTreeFrame = ((RTreeNSMFrame) leftFrame);
+
+        double separation = Double.NEGATIVE_INFINITY;
+        int seed1 = 0, seed2 = 0;
+        int maxFieldPos = keyValueProviders.length / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            frameTuple.resetByTupleIndex(leftRTreeFrame, 0);
+            double leastLowerValue = keyValueProviders[i].getValue(frameTuple.getFieldData(i),
+                    frameTuple.getFieldStart(i));
+            double greatestUpperValue = keyValueProviders[j].getValue(frameTuple.getFieldData(j),
+                    frameTuple.getFieldStart(j));
+            double leastUpperValue = leastLowerValue;
+            double greatestLowerValue = greatestUpperValue;
+            int leastUpperIndex = 0;
+            int greatestLowerIndex = 0;
+            double width;
+
+            int tupleCount = leftRTreeFrame.getTupleCount();
+            for (int k = 1; k < tupleCount; ++k) {
+                frameTuple.resetByTupleIndex(leftRTreeFrame, k);
+                double lowerValue = keyValueProviders[i].getValue(frameTuple.getFieldData(i),
+                        frameTuple.getFieldStart(i));
+                if (lowerValue > greatestLowerValue) {
+                    greatestLowerIndex = k;
+                    cmpFrameTuple.resetByTupleIndex(leftRTreeFrame, k);
+                    greatestLowerValue = keyValueProviders[i].getValue(cmpFrameTuple.getFieldData(i),
+                            cmpFrameTuple.getFieldStart(i));
+                }
+                double higherValue = keyValueProviders[j].getValue(frameTuple.getFieldData(j),
+                        frameTuple.getFieldStart(j));
+                if (higherValue < leastUpperValue) {
+                    leastUpperIndex = k;
+                    cmpFrameTuple.resetByTupleIndex(leftRTreeFrame, k);
+                    leastUpperValue = keyValueProviders[j].getValue(cmpFrameTuple.getFieldData(j),
+                            cmpFrameTuple.getFieldStart(j));
+                }
+
+                leastLowerValue = Math.min(lowerValue, leastLowerValue);
+                greatestUpperValue = Math.max(higherValue, greatestUpperValue);
+            }
+
+            width = greatestUpperValue - leastLowerValue;
+            if (width <= 0) {
+                width = 1;
+            }
+
+            double f = (greatestLowerValue - leastUpperValue) / width;
+
+            if (f > separation) {
+                seed1 = leastUpperIndex;
+                seed2 = greatestLowerIndex;
+                separation = f;
+            }
+        }
+
+        if (seed1 == seed2) {
+            if (seed1 == 0) {
+                seed2 = 1;
+            } else {
+                --seed2;
+            }
+        }
+
+        int totalBytes = 0, numOfDeletedTuples = 0;
+
+        frameTuple.resetByTupleIndex(leftRTreeFrame, seed1);
+        rec[0].set(frameTuple, keyValueProviders);
+        rightFrame.insert(frameTuple, -1);
+        ((UnorderedSlotManager) slotManager).modifySlot(slotManager.getSlotOff(seed1), -1);
+        totalBytes += leftRTreeFrame.getTupleSize(frameTuple);
+        numOfDeletedTuples++;
+
+        frameTuple.resetByTupleIndex(leftRTreeFrame, seed2);
+        rec[1].set(frameTuple, keyValueProviders);
+
+        int remainingTuplestoBeInsertedInRightFrame;
+        for (int k = 0; k < leftRTreeFrame.getTupleCount(); ++k) {
+            remainingTuplestoBeInsertedInRightFrame = leftRTreeFrame.getTupleCount() / 2 - rightFrame.getTupleCount();
+            if (remainingTuplestoBeInsertedInRightFrame == 0) {
+                break;
+            }
+            if (k != seed1 && k != seed2) {
+                frameTuple.resetByTupleIndex(leftRTreeFrame, k);
+                if (rec[0].enlargedArea(frameTuple, keyValueProviders) < rec[1].enlargedArea(frameTuple,
+                        keyValueProviders)
+                        || leftRTreeFrame.getTupleCount() - k <= remainingTuplestoBeInsertedInRightFrame) {
+                    rightFrame.insert(frameTuple, -1);
+                    rec[0].enlarge(frameTuple, keyValueProviders);
+                    ((UnorderedSlotManager) slotManager).modifySlot(slotManager.getSlotOff(k), -1);
+                    totalBytes += leftRTreeFrame.getTupleSize(frameTuple);
+                    numOfDeletedTuples++;
+                } else {
+                    rec[1].enlarge(frameTuple, keyValueProviders);
+                }
+            }
+
+        }
+
+        ((UnorderedSlotManager) slotManager).deleteEmptySlots();
+
+        // maintain space information
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + totalBytes
+                + (slotManager.getSlotSize() * numOfDeletedTuples));
+
+        // compact both pages
+        rightFrame.compact();
+        leftRTreeFrame.compact();
+
+        // The assumption here is that the new tuple cannot be larger than page
+        // size, thus it must fit in either pages.
+        if (rec[0].enlargedArea(tuple, keyValueProviders) < rec[1].enlargedArea(tuple, keyValueProviders)) {
+            if (rightFrame.hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
+                rightFrame.insert(tuple, -1);
+            } else {
+                leftRTreeFrame.insert(tuple, -1);
+            }
+        } else if (leftRTreeFrame.hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
+            leftRTreeFrame.insert(tuple, -1);
+        } else {
+            rightFrame.insert(tuple, -1);
+        }
+
+        int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
+        frameTuple.resetByTupleOffset(buf, tupleOff);
+        int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0, keyValueProviders.length);
+
+        splitKey.initData(splitKeySize);
+        leftRTreeFrame.adjustMBR();
+        rTreeTupleWriterLeftFrame.writeTupleFields(leftRTreeFrame.getTuples(), 0, rTreeSplitKey.getLeftPageBuffer(), 0);
+        rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(), 0);
+
+        ((IRTreeFrame) rightFrame).adjustMBR();
+        rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getTuples(), 0,
+                rTreeSplitKey.getRightPageBuffer(), 0);
+        rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(), 0);
+    }
+
+    @Override
+    public int findBestChildPosition(ITreeIndexFrame frame, ITupleReference tuple, ITreeIndexTupleReference frameTuple,
+            MultiComparator cmp) {
+        cmpFrameTuple.setFieldCount(cmp.getKeyFieldCount());
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+
+        int bestChild = 0;
+        double minEnlargedArea = Double.MAX_VALUE;
+
+        // find minimum enlarged area, use minimum area to break tie
+        for (int i = 0; i < frame.getTupleCount(); i++) {
+            frameTuple.resetByTupleIndex(frame, i);
+            double enlargedArea = RTreeComputationUtils.enlargedArea(frameTuple, tuple, cmp, keyValueProviders);
+            if (enlargedArea < minEnlargedArea) {
+                minEnlargedArea = enlargedArea;
+                bestChild = i;
+            } else if (enlargedArea == minEnlargedArea) {
+                double area = RTreeComputationUtils.area(frameTuple, cmp, keyValueProviders);
+                frameTuple.resetByTupleIndex(frame, bestChild);
+                double minArea = RTreeComputationUtils.area(frameTuple, cmp, keyValueProviders);
+                if (area < minArea) {
+                    bestChild = i;
+                }
+            }
+        }
+
+        return bestChild;
+    }
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreePolicyType.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreePolicyType.java
new file mode 100644
index 0000000..712c424
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreePolicyType.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+public enum RTreePolicyType {
+    RTREE, RSTARTREE
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index cc3cf5b..c12dc50 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -15,88 +15,64 @@
 
 package edu.uci.ics.hyracks.storage.am.rtree.impls;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexType;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
-import edu.uci.ics.hyracks.storage.am.common.impls.TreeDiskOrderScanCursor;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.AbstractTreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.impls.NodeFrontier;
+import edu.uci.ics.hyracks.storage.am.common.impls.TreeIndexDiskOrderScanCursor;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.util.TreeIndexUtils;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
 import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
-public class RTree implements ITreeIndex {
-
-    private final int rootPage = 1;
+public class RTree extends AbstractTreeIndex {
 
     // Global node sequence number used for the concurrency control protocol
     private final AtomicLong globalNsn;
-    private final ReadWriteLock treeLatch;
 
-    private final IFreePageManager freePageManager;
-    private final IBufferCache bufferCache;
-    private int fileId;
-
-    private final ITreeIndexFrameFactory interiorFrameFactory;
-    private final ITreeIndexFrameFactory leafFrameFactory;
-    private final int fieldCount;
-    private final IBinaryComparatorFactory[] cmpFactories;
-
-    public RTree(IBufferCache bufferCache, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
-            IFreePageManager freePageManager, ITreeIndexFrameFactory interiorFrameFactory,
-            ITreeIndexFrameFactory leafFrameFactory) {
-        this.bufferCache = bufferCache;
-        this.fieldCount = fieldCount;
-        this.cmpFactories = cmpFactories;
-        this.freePageManager = freePageManager;
-        this.interiorFrameFactory = interiorFrameFactory;
-        this.leafFrameFactory = leafFrameFactory;
+    public RTree(IBufferCache bufferCache, IFileMapProvider fileMapProvider, IFreePageManager freePageManager,
+            ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory leafFrameFactory,
+            IBinaryComparatorFactory[] cmpFactories, int fieldCount, FileReference file) {
+        super(bufferCache, fileMapProvider, freePageManager, interiorFrameFactory, leafFrameFactory, cmpFactories,
+                fieldCount, file);
         globalNsn = new AtomicLong();
-        this.treeLatch = new ReentrantReadWriteLock(true);
     }
 
     private long incrementGlobalNsn() {
         return globalNsn.incrementAndGet();
     }
 
-    public byte getTreeHeight(IRTreeLeafFrame leafFrame) throws HyracksDataException {
-        ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
-        rootNode.acquireReadLatch();
-        try {
-            leafFrame.setPage(rootNode);
-            return leafFrame.getLevel();
-        } finally {
-            rootNode.releaseReadLatch();
-            bufferCache.unpin(rootNode);
-        }
-    }
-
     @SuppressWarnings("rawtypes")
     public String printTree(IRTreeLeafFrame leafFrame, IRTreeInteriorFrame interiorFrame,
             ISerializerDeserializer[] keySerdes) throws Exception {
@@ -162,66 +138,21 @@
         }
     }
 
-    @Override
-    public void create(int fileId) throws HyracksDataException {
-        treeLatch.writeLock().lock();
-        try {
-            ITreeIndexFrame leafFrame = leafFrameFactory.createFrame();
-            ITreeIndexMetaDataFrame metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
-            freePageManager.open(fileId);
-            freePageManager.init(metaFrame, rootPage);
-
-            // initialize root page
-            ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
-
-            rootNode.acquireWriteLatch();
-            try {
-                leafFrame.setPage(rootNode);
-                leafFrame.initBuffer((byte) 0);
-            } finally {
-                rootNode.releaseWriteLatch();
-                bufferCache.unpin(rootNode);
-            }
-        } finally {
-            treeLatch.writeLock().unlock();
-        }
-    }
-
-    @Override
-    public void open(int fileId) {
-        this.fileId = fileId;
-        freePageManager.open(fileId);
-    }
-
-    @Override
-    public void close() {
-        fileId = -1;
-        freePageManager.close();
-    }
-
-    @Override
-    public int getFileId() {
-        return fileId;
-    }
-
-    @Override
-    public IBufferCache getBufferCache() {
-        return bufferCache;
-    }
-
-    private RTreeOpContext createOpContext() {
+    private RTreeOpContext createOpContext(IModificationOperationCallback modificationCallback) {
         return new RTreeOpContext((IRTreeLeafFrame) leafFrameFactory.createFrame(),
                 (IRTreeInteriorFrame) interiorFrameFactory.createFrame(), freePageManager.getMetaDataFrameFactory()
-                        .createFrame(), cmpFactories, 8);
+                        .createFrame(), cmpFactories, 8, modificationCallback);
     }
 
-    private void insert(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException, TreeIndexException {
+    private void insert(ITupleReference tuple, IIndexOperationContext ictx) throws HyracksDataException,
+            TreeIndexException {
         RTreeOpContext ctx = (RTreeOpContext) ictx;
         ctx.reset();
         ctx.setTuple(tuple);
         ctx.splitKey.reset();
         ctx.splitKey.getLeftTuple().setFieldCount(cmpFactories.length);
         ctx.splitKey.getRightTuple().setFieldCount(cmpFactories.length);
+        ctx.modificationCallback.before(tuple);
 
         int maxFieldPos = cmpFactories.length / 2;
         for (int i = 0; i < maxFieldPos; i++) {
@@ -232,6 +163,7 @@
                 throw new IllegalArgumentException("The low key point has larger coordinates than the high key point.");
             }
         }
+
         try {
             ICachedPage leafNode = findLeaf(ctx);
 
@@ -323,9 +255,9 @@
                 ctx.pathList.add(pageId, pageLsn, -1);
 
                 if (!isLeaf) {
-                    // findBestChild must be called *before* getBestChildPageId
-                    boolean enlarementIsNeeded = ctx.interiorFrame.findBestChild(ctx.getTuple(), ctx.cmp);
-                    int childPageId = ctx.interiorFrame.getBestChildPageId();
+                    // findBestChild must be called *before* checkIfEnlarementIsNeeded
+                    int childPageId = ctx.interiorFrame.findBestChild(ctx.getTuple(), ctx.cmp);
+                    boolean enlarementIsNeeded = ctx.interiorFrame.checkIfEnlarementIsNeeded(ctx.getTuple(), ctx.cmp);
 
                     if (enlarementIsNeeded) {
                         if (!writeLatched) {
@@ -404,6 +336,7 @@
                     if (!isLeaf) {
                         ctx.interiorFrame.insert(tuple, -1);
                     } else {
+                        ctx.modificationCallback.found(null, tuple);
                         ctx.leafFrame.insert(tuple, -1);
                     }
                     succeeded = true;
@@ -428,6 +361,7 @@
                         ctx.interiorFrame.insert(tuple, -1);
                     } else {
                         ctx.leafFrame.compact();
+                        ctx.modificationCallback.found(null, tuple);
                         ctx.leafFrame.insert(tuple, -1);
                     }
                     succeeded = true;
@@ -464,6 +398,7 @@
                         rightFrame.setPage(rightNode);
                         rightFrame.initBuffer((byte) 0);
                         rightFrame.setRightPage(ctx.interiorFrame.getRightPage());
+                        ctx.modificationCallback.found(null, tuple);
                         ctx.leafFrame.split(rightFrame, tuple, ctx.splitKey);
                         ctx.leafFrame.setRightPage(rightPageId);
                     }
@@ -795,12 +730,14 @@
     }
 
     private void deleteTuple(int tupleIndex, RTreeOpContext ctx) throws HyracksDataException {
+        ITupleReference beforeTuple = ctx.leafFrame.getBeforeTuple(ctx.getTuple(), tupleIndex, ctx.cmp);
+        ctx.modificationCallback.found(beforeTuple, ctx.getTuple());
         ctx.leafFrame.delete(tupleIndex, ctx.cmp);
         ctx.leafFrame.setPageLsn(incrementGlobalNsn());
     }
 
     private void search(ITreeIndexCursor cursor, ISearchPredicate searchPred, RTreeOpContext ctx)
-            throws HyracksDataException, TreeIndexException {
+            throws HyracksDataException, IndexException {
         ctx.reset();
         ctx.cursor = cursor;
 
@@ -810,90 +747,18 @@
         ctx.cursor.open(ctx.cursorInitialState, (SearchPredicate) searchPred);
     }
 
-    @Override
-    public ITreeIndexFrameFactory getInteriorFrameFactory() {
-        return interiorFrameFactory;
-    }
-
-    @Override
-    public ITreeIndexFrameFactory getLeafFrameFactory() {
-        return leafFrameFactory;
-    }
-
-    @Override
-    public IBinaryComparatorFactory[] getComparatorFactories() {
-        return cmpFactories;
-    }
-
-    @Override
-    public IFreePageManager getFreePageManager() {
-        return freePageManager;
-    }
-
     private void update(ITupleReference tuple, RTreeOpContext ctx) {
         throw new UnsupportedOperationException("RTree Update not implemented.");
     }
 
-    public boolean isEmptyTree(IRTreeLeafFrame leafFrame) throws HyracksDataException {
-        ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
-        rootNode.acquireReadLatch();
-        try {
-            leafFrame.setPage(rootNode);
-            if (leafFrame.getLevel() == 0 && leafFrame.getTupleCount() == 0) {
-                return true;
-            } else {
-                return false;
-            }
-        } finally {
-            rootNode.releaseReadLatch();
-            bufferCache.unpin(rootNode);
-        }
-    }
-
-    public final class BulkLoadContext implements IIndexBulkLoadContext {
-
-        public ITreeIndexAccessor indexAccessor;
-
-        public BulkLoadContext(float fillFactor, IRTreeFrame leafFrame, IRTreeFrame interiorFrame,
-                ITreeIndexMetaDataFrame metaFrame) throws HyracksDataException {
-            indexAccessor = createAccessor();
-        }
-    }
-
-    @Override
-    public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws HyracksDataException {
-        IRTreeLeafFrame leafFrame = (IRTreeLeafFrame) leafFrameFactory.createFrame();
-        if (!isEmptyTree(leafFrame)) {
-            throw new HyracksDataException("Trying to Bulk-load a non-empty RTree.");
-        }
-
-        BulkLoadContext ctx = new BulkLoadContext(fillFactor, (IRTreeFrame) leafFrameFactory.createFrame(),
-                (IRTreeFrame) interiorFrameFactory.createFrame(), freePageManager.getMetaDataFrameFactory()
-                        .createFrame());
-        return ctx;
-    }
-
-    @Override
-    public void bulkLoadAddTuple(ITupleReference tuple, IIndexBulkLoadContext ictx) throws HyracksDataException {
-        try {
-            ((BulkLoadContext) ictx).indexAccessor.insert(tuple);
-        } catch (Exception e) {
-            throw new HyracksDataException("BulkLoad Error", e);
-        }
-    }
-
-    @Override
-    public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
-    }
-
     private void diskOrderScan(ITreeIndexCursor icursor, RTreeOpContext ctx) throws HyracksDataException {
-        TreeDiskOrderScanCursor cursor = (TreeDiskOrderScanCursor) icursor;
+        TreeIndexDiskOrderScanCursor cursor = (TreeIndexDiskOrderScanCursor) icursor;
         ctx.reset();
 
         MultiComparator cmp = MultiComparator.create(cmpFactories);
         SearchPredicate searchPred = new SearchPredicate(null, cmp);
 
-        int currentPageId = rootPage + 1;
+        int currentPageId = rootPage;
         int maxPageId = freePageManager.getMaxPage(ctx.metaFrame);
 
         ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false);
@@ -903,6 +768,7 @@
             cursor.setFileId(fileId);
             cursor.setCurrentPageId(currentPageId);
             cursor.setMaxPageId(maxPageId);
+            ctx.cursorInitialState.setOriginialKeyComparator(ctx.cmp);
             ctx.cursorInitialState.setPage(page);
             cursor.open(ctx.cursorInitialState, searchPred);
         } catch (Exception e) {
@@ -913,49 +779,36 @@
     }
 
     @Override
-    public int getRootPageId() {
-        return rootPage;
-    }
-
-    @Override
-    public int getFieldCount() {
-        return fieldCount;
-    }
-
-    @Override
-    public IndexType getIndexType() {
-        return IndexType.RTREE;
-    }
-
-    @Override
-    public ITreeIndexAccessor createAccessor() {
-        return new RTreeAccessor(this);
+    public ITreeIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+            ISearchOperationCallback searchCallback) {
+        return new RTreeAccessor(this, modificationCallback, searchCallback);
     }
 
     public class RTreeAccessor implements ITreeIndexAccessor {
         private RTree rtree;
         private RTreeOpContext ctx;
 
-        public RTreeAccessor(RTree rtree) {
+        public RTreeAccessor(RTree rtree, IModificationOperationCallback modificationCallback,
+                ISearchOperationCallback searchCallback) {
             this.rtree = rtree;
-            this.ctx = rtree.createOpContext();
+            this.ctx = rtree.createOpContext(modificationCallback);
         }
 
         @Override
         public void insert(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
-            ctx.reset(IndexOp.INSERT);
+            ctx.setOperation(IndexOperation.INSERT);
             rtree.insert(tuple, ctx);
         }
 
         @Override
         public void update(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
-            ctx.reset(IndexOp.UPDATE);
+            ctx.setOperation(IndexOperation.UPDATE);
             rtree.update(tuple, ctx);
         }
 
         @Override
         public void delete(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
-            ctx.reset(IndexOp.DELETE);
+            ctx.setOperation(IndexOperation.DELETE);
             rtree.delete(tuple, ctx);
         }
 
@@ -968,18 +821,18 @@
         @Override
         public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException,
                 IndexException {
-            ctx.reset(IndexOp.SEARCH);
+            ctx.setOperation(IndexOperation.SEARCH);
             rtree.search((ITreeIndexCursor) cursor, searchPred, ctx);
         }
 
         @Override
         public ITreeIndexCursor createDiskOrderScanCursor() {
-            return new TreeDiskOrderScanCursor(leafFrameFactory.createFrame());
+            return new TreeIndexDiskOrderScanCursor(leafFrameFactory.createFrame());
         }
 
         @Override
         public void diskOrderScan(ITreeIndexCursor cursor) throws HyracksDataException {
-            ctx.reset(IndexOp.DISKORDERSCAN);
+            ctx.setOperation(IndexOperation.DISKORDERSCAN);
             rtree.diskOrderScan(cursor, ctx);
         }
 
@@ -990,7 +843,140 @@
         @Override
         public void upsert(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
             throw new UnsupportedOperationException(
-                    "The RTree does not suypport the notion of keys, therefore upsert does not make sense.");
+                    "The RTree does not support the notion of keys, therefore upsert does not make sense.");
         }
     }
+
+    @Override
+    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
+            throws TreeIndexException {
+        // TODO: verifyInput currently does nothing.
+        try {
+            return new RTreeBulkLoader(fillFactor);
+        } catch (HyracksDataException e) {
+            throw new TreeIndexException(e);
+        }
+    }
+
+    public class RTreeBulkLoader extends AbstractTreeIndex.AbstractTreeIndexBulkLoader {
+        ITreeIndexFrame lowerFrame, prevInteriorFrame;
+        RTreeTypeAwareTupleWriter tupleWriter = ((RTreeTypeAwareTupleWriter) interiorFrame.getTupleWriter());
+        ITreeIndexTupleReference mbrTuple = interiorFrame.createTupleReference();
+        ByteBuffer mbr;
+
+        public RTreeBulkLoader(float fillFactor) throws TreeIndexException, HyracksDataException {
+            super(fillFactor);
+            prevInteriorFrame = interiorFrameFactory.createFrame();
+        }
+
+        @Override
+        public void add(ITupleReference tuple) throws HyracksDataException {
+            try {
+                NodeFrontier leafFrontier = nodeFrontiers.get(0);
+
+                int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize;
+                int spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
+
+                // try to free space by compression
+                if (spaceUsed + spaceNeeded > leafMaxBytes) {
+                    leafFrame.compress();
+                    spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
+                }
+
+                if (spaceUsed + spaceNeeded > leafMaxBytes) {
+                    propagateBulk(1, false);
+
+                    leafFrontier.pageId = freePageManager.getFreePage(metaFrame);
+
+                    leafFrontier.page.releaseWriteLatch();
+                    bufferCache.unpin(leafFrontier.page);
+
+                    leafFrontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId),
+                            true);
+                    leafFrontier.page.acquireWriteLatch();
+                    leafFrame.setPage(leafFrontier.page);
+                    leafFrame.initBuffer((byte) 0);
+                }
+
+                leafFrame.setPage(leafFrontier.page);
+                leafFrame.insert(tuple, -1);
+            } catch (HyracksDataException e) {
+                handleException();
+                throw e;
+            } catch (RuntimeException e) {
+                handleException();
+                throw e;
+            }
+
+        }
+
+        public void end() throws HyracksDataException {
+            propagateBulk(1, true);
+
+            super.end();
+        }
+
+        protected void propagateBulk(int level, boolean toRoot) throws HyracksDataException {
+            boolean propagated = false;
+
+            if (level == 1)
+                lowerFrame = leafFrame;
+
+            if (lowerFrame.getTupleCount() == 0)
+                return;
+
+            if (level >= nodeFrontiers.size())
+                addLevel();
+
+            ((RTreeNSMFrame) lowerFrame).adjustMBR();
+
+            if (mbr == null) {
+                int bytesRequired = tupleWriter.bytesRequired(((RTreeNSMFrame) lowerFrame).getTuples()[0], 0,
+                        cmp.getKeyFieldCount())
+                        + ((RTreeNSMInteriorFrame) interiorFrame).getChildPointerSize();
+                mbr = ByteBuffer.allocate(bytesRequired);
+            }
+            tupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getTuples(), 0, mbr, 0);
+            mbrTuple.resetByTupleOffset(mbr, 0);
+
+            NodeFrontier frontier = nodeFrontiers.get(level);
+            interiorFrame.setPage(frontier.page);
+
+            interiorFrame.insert(mbrTuple, -1);
+
+            interiorFrame.getBuffer().putInt(
+                    interiorFrame.getTupleOffset(interiorFrame.getTupleCount() - 1) + mbrTuple.getTupleSize(),
+                    nodeFrontiers.get(level - 1).pageId);
+
+            if (interiorFrame.hasSpaceInsert(mbrTuple) != FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE && !toRoot) {
+                lowerFrame = prevInteriorFrame;
+                lowerFrame.setPage(frontier.page);
+
+                propagateBulk(level + 1, toRoot);
+                propagated = true;
+
+                frontier.page.releaseWriteLatch();
+                bufferCache.unpin(frontier.page);
+                frontier.pageId = freePageManager.getFreePage(metaFrame);
+
+                frontier.page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, frontier.pageId), true);
+                frontier.page.acquireWriteLatch();
+                interiorFrame.setPage(frontier.page);
+                interiorFrame.initBuffer((byte) level);
+            }
+
+            if (toRoot && !propagated && level < nodeFrontiers.size() - 1) {
+                lowerFrame = prevInteriorFrame;
+                lowerFrame.setPage(frontier.page);
+                propagateBulk(level + 1, true);
+            }
+
+            leafFrame.setPage(nodeFrontiers.get(0).page);
+        }
+    }
+
+    @Override
+    public void validate() throws HyracksDataException {
+        throw new UnsupportedOperationException("Validation not implemented for R-Trees.");
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeCursorInitialState.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeCursorInitialState.java
index ac1eb7d..8a7ea8d 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeCursorInitialState.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeCursorInitialState.java
@@ -16,36 +16,59 @@
 package edu.uci.ics.hyracks.storage.am.rtree.impls;
 
 import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
 
 public class RTreeCursorInitialState implements ICursorInitialState {
 
-	private PathList pathList;
-	private int rootPage;
-	private ICachedPage page; // for disk order scan
+    private PathList pathList;
+    private int rootPage;
+    private ICachedPage page; // for disk order scan
+    private MultiComparator originalKeyCmp;
 
-	public RTreeCursorInitialState(PathList pathList, int rootPage) {
-		this.pathList = pathList;
-		this.rootPage = rootPage;
-	}
+    public RTreeCursorInitialState(PathList pathList, int rootPage) {
+        this.pathList = pathList;
+        this.rootPage = rootPage;
+    }
 
-	public PathList getPathList() {
-		return pathList;
-	}
+    public PathList getPathList() {
+        return pathList;
+    }
 
-	public int getRootPage() {
-		return rootPage;
-	}
+    public int getRootPage() {
+        return rootPage;
+    }
 
-	public void setRootPage(int rootPage) {
-		this.rootPage = rootPage;
-	}
+    public void setRootPage(int rootPage) {
+        this.rootPage = rootPage;
+    }
 
-	public ICachedPage getPage() {
-		return page;
-	}
+    public ICachedPage getPage() {
+        return page;
+    }
 
-	public void setPage(ICachedPage page) {
-		this.page = page;
-	}
+    public void setPage(ICachedPage page) {
+        this.page = page;
+    }
+
+    @Override
+    public MultiComparator getOriginalKeyComparator() {
+        return originalKeyCmp;
+    }
+
+    @Override
+    public void setOriginialKeyComparator(MultiComparator originalCmp) {
+        this.originalKeyCmp = originalCmp;
+    }
+
+    @Override
+    public ISearchOperationCallback getSearchOperationCallback() {
+        return null;
+    }
+
+    @Override
+    public void setSearchOperationCallback(ISearchOperationCallback searchCallback) {
+        // Do nothing
+    }
 }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
index 6683444..219ab30 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
@@ -19,21 +19,22 @@
 
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
 
-public class RTreeOpContext implements IIndexOpContext {
+public class RTreeOpContext implements IIndexOperationContext {
     private static final int INITIAL_TRAVERSE_LIST_SIZE = 100;
     public final MultiComparator cmp;
     public final IRTreeInteriorFrame interiorFrame;
     public final IRTreeLeafFrame leafFrame;
-    public IndexOp op;
+    public IndexOperation op;
     public ITreeIndexCursor cursor;
     public RTreeCursorInitialState cursorInitialState;
     public ITreeIndexMetaDataFrame metaFrame;
@@ -47,12 +48,22 @@
     public ArrayList<ICachedPage> NSNUpdates;
     public ArrayList<ICachedPage> LSNUpdates;
 
+    public final IModificationOperationCallback modificationCallback;
+
     public RTreeOpContext(IRTreeLeafFrame leafFrame, IRTreeInteriorFrame interiorFrame,
-            ITreeIndexMetaDataFrame metaFrame, IBinaryComparatorFactory[] cmpFactories, int treeHeightHint) {
-        this.cmp = MultiComparator.create(cmpFactories);
+            ITreeIndexMetaDataFrame metaFrame, IBinaryComparatorFactory[] cmpFactories, int treeHeightHint,
+            IModificationOperationCallback modificationCallback) {
+        
+        if (cmpFactories[0] != null) { 
+            this.cmp = MultiComparator.create(cmpFactories);
+        } else {
+            this.cmp = null;
+        }
+        
         this.interiorFrame = interiorFrame;
         this.leafFrame = leafFrame;
         this.metaFrame = metaFrame;
+        this.modificationCallback = modificationCallback;
         pathList = new PathList(treeHeightHint, treeHeightHint);
         NSNUpdates = new ArrayList<ICachedPage>();
         LSNUpdates = new ArrayList<ICachedPage>();
@@ -78,11 +89,11 @@
     }
 
     @Override
-    public void reset(IndexOp newOp) {
+    public void setOperation(IndexOperation newOp) {
         if (op != null && newOp == op) {
             return;
         }
-        if (op != IndexOp.SEARCH && op != IndexOp.DISKORDERSCAN) {
+        if (op != IndexOperation.SEARCH && op != IndexOperation.DISKORDERSCAN) {
             if (splitKey == null) {
                 splitKey = new RTreeSplitKey(interiorFrame.getTupleWriter().createTupleReference(), interiorFrame
                         .getTupleWriter().createTupleReference());
@@ -96,4 +107,9 @@
         }
         this.op = newOp;
     }
+
+    @Override
+    public IndexOperation getOperation() {
+        return op;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
index ee7ec5f..6b5b1b5 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
@@ -33,20 +33,20 @@
     private int fileId = -1;
     private ICachedPage page = null;
     private IRTreeInteriorFrame interiorFrame = null;
-    private IRTreeLeafFrame leafFrame = null;
+    protected IRTreeLeafFrame leafFrame = null;
     private IBufferCache bufferCache = null;
 
     private SearchPredicate pred;
     private PathList pathList;
     private int rootPage;
-    private ITupleReference searchKey;
+    protected ITupleReference searchKey;
 
     private int tupleIndex = 0;
     private int tupleIndexInc = 0;
     private int currentTupleIndex = 0;
     private int pageId = -1;
 
-    private MultiComparator cmp;
+    protected MultiComparator cmp;
 
     private ITreeIndexTupleReference frameTuple;
     private boolean readLatched = false;
@@ -70,6 +70,7 @@
         pathList = null;
     }
 
+    @Override
     public ITupleReference getTuple() {
         return frameTuple;
     }
@@ -87,7 +88,7 @@
         return page;
     }
 
-    private boolean fetchNextLeafPage() throws HyracksDataException {
+    protected boolean fetchNextLeafPage() throws HyracksDataException {
         boolean succeeded = false;
         if (readLatched) {
             page.releaseReadLatch();
@@ -117,15 +118,18 @@
                 }
 
                 if (!isLeaf) {
+                    // We do DFS so that we get the tuples ordered (for disk
+                    // RTrees only) in the case we we are using total order
+                    // (such as Hilbert order)
                     if (searchKey != null) {
-                        for (int i = 0; i < interiorFrame.getTupleCount(); i++) {
+                        for (int i = interiorFrame.getTupleCount() - 1; i >= 0; i--) {
                             int childPageId = interiorFrame.getChildPageIdIfIntersect(searchKey, i, cmp);
                             if (childPageId != -1) {
                                 pathList.add(childPageId, pageLsn, -1);
                             }
                         }
                     } else {
-                        for (int i = 0; i < interiorFrame.getTupleCount(); i++) {
+                        for (int i = interiorFrame.getTupleCount() - 1; i >= 0; i--) {
                             int childPageId = interiorFrame.getChildPageId(i);
                             pathList.add(childPageId, pageLsn, -1);
                         }
@@ -230,12 +234,8 @@
     }
 
     @Override
-    public void reset() {
-        try {
-            close();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+    public void reset() throws HyracksDataException {
+        close();
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java
index cb9b160..d0f4c71 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java
@@ -19,94 +19,120 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
 
 public class Rectangle {
-	private int dim;
-	private double[] low;
-	private double[] high;
+    private int dim;
+    private double[] low;
+    private double[] high;
 
-	public Rectangle(int dim) {
-		this.dim = dim;
-		low = new double[this.dim];
-		high = new double[this.dim];
-	}
+    public Rectangle(int dim) {
+        this.dim = dim;
+        low = new double[this.dim];
+        high = new double[this.dim];
+    }
 
-	public int getDim() {
-		return dim;
-	}
+    public int getDim() {
+        return dim;
+    }
 
-	public double getLow(int i) {
-		return low[i];
-	}
+    public double getLow(int i) {
+        return low[i];
+    }
 
-	public double getHigh(int i) {
-		return high[i];
-	}
+    public double getHigh(int i) {
+        return high[i];
+    }
 
-	public void setLow(int i, double value) {
-		low[i] = value;
-	}
+    public void setLow(int i, double value) {
+        low[i] = value;
+    }
 
-	public void setHigh(int i, double value) {
-		high[i] = value;
-	}
+    public void setHigh(int i, double value) {
+        high[i] = value;
+    }
 
-	public void set(ITupleReference tuple, IPrimitiveValueProvider[] valueProviders) {
-		for (int i = 0; i < getDim(); i++) {
-			int j = i + getDim();
-			setLow(i, valueProviders[i].getValue(
-					tuple.getFieldData(i), tuple.getFieldStart(i)));
-			setHigh(i, valueProviders[j].getValue(
-					tuple.getFieldData(j), tuple.getFieldStart(j)));
-		}
-	}
+    public void set(ITupleReference tuple, IPrimitiveValueProvider[] valueProviders) {
+        for (int i = 0; i < getDim(); i++) {
+            int j = i + getDim();
+            setLow(i, valueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i)));
+            setHigh(i, valueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j)));
+        }
+    }
 
-	public void enlarge(ITupleReference tupleToBeInserted, IPrimitiveValueProvider[] valueProviders) {
-		for (int i = 0; i < getDim(); i++) {
-			int j = getDim() + i;
-			double low = valueProviders[i].getValue(
-					tupleToBeInserted.getFieldData(i),
-					tupleToBeInserted.getFieldStart(i));
-			if (getLow(i) > low) {
-				setLow(i, low);
-			}
-			double high = valueProviders[j].getValue(
-					tupleToBeInserted.getFieldData(j),
-					tupleToBeInserted.getFieldStart(j));
-			if (getHigh(i) < high) {
-				setHigh(i, high);
-			}
-		}
-	}
+    public void enlarge(ITupleReference tupleToBeInserted, IPrimitiveValueProvider[] valueProviders) {
+        for (int i = 0; i < getDim(); i++) {
+            int j = getDim() + i;
+            double low = valueProviders[i].getValue(tupleToBeInserted.getFieldData(i),
+                    tupleToBeInserted.getFieldStart(i));
+            if (getLow(i) > low) {
+                setLow(i, low);
+            }
+            double high = valueProviders[j].getValue(tupleToBeInserted.getFieldData(j),
+                    tupleToBeInserted.getFieldStart(j));
+            if (getHigh(i) < high) {
+                setHigh(i, high);
+            }
+        }
+    }
 
-	public double margin() {
-		double margin = 0.0;
-		double mul = Math.pow(2, (double) getDim() - 1.0);
-		for (int i = 0; i < getDim(); i++) {
-			margin += (getHigh(i) - getLow(i)) * mul;
-		}
-		return margin;
-	}
+    public double enlargedArea(ITupleReference tupleToBeInserted, IPrimitiveValueProvider[] valueProviders) {
+        double areaBeforeEnlarge = area();
+        double areaAfterEnlarge = 1.0;
 
-	public double overlappedArea(Rectangle rec) {
-		double area = 1.0;
-		double f1, f2;
+        for (int i = 0; i < getDim(); i++) {
+            int j = getDim() + i;
 
-		for (int i = 0; i < getDim(); i++) {
-			if (getLow(i) > rec.getHigh(i) || getHigh(i) < rec.getLow(i)) {
-				return 0.0;
-			}
+            double low = valueProviders[i].getValue(tupleToBeInserted.getFieldData(i),
+                    tupleToBeInserted.getFieldStart(i));
+            double lowAfterEnlargement;
+            if (getLow(i) > low) {
+                lowAfterEnlargement = low;
+            } else {
+                lowAfterEnlargement = getLow(i);
+            }
 
-			f1 = Math.max(getLow(i), rec.getLow(i));
-			f2 = Math.min(getHigh(i), rec.getHigh(i));
-			area *= f2 - f1;
-		}
-		return area;
-	}
+            double high = valueProviders[j].getValue(tupleToBeInserted.getFieldData(j),
+                    tupleToBeInserted.getFieldStart(j));
+            double highAfterEnlargement;
+            if (getHigh(i) < high) {
+                highAfterEnlargement = high;
+            } else {
+                highAfterEnlargement = getHigh(i);
+            }
 
-	public double area() {
-		double area = 1.0;
-		for (int i = 0; i < getDim(); i++) {
-			area *= getHigh(i) - getLow(i);
-		}
-		return area;
-	}
+            areaAfterEnlarge *= highAfterEnlargement - lowAfterEnlargement;
+        }
+        return areaAfterEnlarge - areaBeforeEnlarge;
+    }
+
+    public double margin() {
+        double margin = 0.0;
+        double mul = Math.pow(2, (double) getDim() - 1.0);
+        for (int i = 0; i < getDim(); i++) {
+            margin += (getHigh(i) - getLow(i)) * mul;
+        }
+        return margin;
+    }
+
+    public double overlappedArea(Rectangle rec) {
+        double area = 1.0;
+        double f1, f2;
+
+        for (int i = 0; i < getDim(); i++) {
+            if (getLow(i) > rec.getHigh(i) || getHigh(i) < rec.getLow(i)) {
+                return 0.0;
+            }
+
+            f1 = Math.max(getLow(i), rec.getLow(i));
+            f2 = Math.min(getHigh(i), rec.getHigh(i));
+            area *= f2 - f1;
+        }
+        return area;
+    }
+
+    public double area() {
+        double area = 1.0;
+        for (int i = 0; i < getDim(); i++) {
+            area *= getHigh(i) - getLow(i);
+        }
+        return area;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/HilbertDoubleComparator.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/HilbertDoubleComparator.java
new file mode 100644
index 0000000..7fce7e0
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/HilbertDoubleComparator.java
@@ -0,0 +1,180 @@
+package edu.uci.ics.hyracks.storage.am.rtree.linearize;

+

+import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparator;

+import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;

+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;

+import edu.uci.ics.hyracks.storage.am.common.ophelpers.DoubleArrayList;

+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IntArrayList;

+import edu.uci.ics.hyracks.storage.am.rtree.impls.DoublePrimitiveValueProviderFactory;

+

+/*

+ * This compares two points based on the hilbert curve. Currently, it only supports

+ * doubles (this can be changed by changing all doubles to ints as there are no

+ * number generics in Java) in the two-dimensional space. For more dimensions, the

+ * state machine has to be automatically generated. The idea of the fractal generation

+ * of the curve is described e.g. in http://dl.acm.org/ft_gateway.cfm?id=383528&type=pdf

+ * 

+ * Unlike the described approach, this comparator does not compute the hilbert value at 

+ * any point. Instead, it only evaluates how the two inputs compare to each other. This

+ * is done by starting at the lowest hilbert resolution and zooming in on the fractal until

+ * the two points are in different quadrants.

+ * 

+ * As a performance optimization, the state of the state machine is saved in a stack and 

+ * maintained over comparisons. The idea behind this is that comparisons are usually in a

+ * similar area (e.g. geo coordinates). Zooming in from [-MAX_VALUE, MAX_VALUE] would take

+ * ~300 steps every time. Instead, the comparator start from the previous state and zooms out

+ * if necessary

+ */

+

+public class HilbertDoubleComparator implements ILinearizeComparator {

+    private final int dim; // dimension

+    private final HilbertState[] states;

+

+    private double[] bounds;

+    private double stepsize;

+    private int state;

+    private IntArrayList stateStack = new IntArrayList(1000, 200);

+    private DoubleArrayList boundsStack = new DoubleArrayList(2000, 400);

+

+    private IPrimitiveValueProvider valueProvider = DoublePrimitiveValueProviderFactory.INSTANCE

+            .createPrimitiveValueProvider();

+

+    private double[] a;

+    private double[] b;

+

+    private class HilbertState {

+        public final int[] nextState;

+        public final int[] position;

+

+        public HilbertState(int[] nextState, int[] order) {

+            this.nextState = nextState;

+            this.position = order;

+        }

+    }

+

+    public HilbertDoubleComparator(int dimension) {

+        if (dimension != 2)

+            throw new IllegalArgumentException();

+        dim = dimension;

+        a = new double[dim];

+        b = new double[dim];

+

+        states = new HilbertState[] { new HilbertState(new int[] { 3, 0, 1, 0 }, new int[] { 0, 1, 3, 2 }),

+                new HilbertState(new int[] { 1, 1, 0, 2 }, new int[] { 2, 1, 3, 0 }),

+                new HilbertState(new int[] { 2, 3, 2, 1 }, new int[] { 2, 3, 1, 0 }),

+                new HilbertState(new int[] { 0, 2, 3, 3 }, new int[] { 0, 3, 1, 2 }) };

+

+        resetStateMachine();

+    }

+

+    private void resetStateMachine() {

+        state = 0;

+        stateStack.clear();

+        stepsize = Double.MAX_VALUE / 2;

+        bounds = new double[dim];

+        boundsStack.clear();

+    }

+

+    public int compare() {

+        boolean equal = true;

+        for (int i = 0; i < dim; i++) {

+            if (a[i] != b[i])

+                equal = false;

+        }

+        if (equal)

+            return 0;

+

+        // We keep the state of the state machine after a comparison. In most

+        // cases,

+        // the needed zoom factor is close to the old one. In this step, we

+        // check if we have

+        // to zoom out

+        while (true) {

+            if (stateStack.size() <= dim) {

+                resetStateMachine();

+                break;

+            }

+            boolean zoomOut = false;

+            for (int i = 0; i < dim; i++) {

+                if (Math.min(a[i], b[i]) <= bounds[i] - 2 * stepsize

+                        || Math.max(a[i], b[i]) >= bounds[i] + 2 * stepsize) {

+                    zoomOut = true;

+                    break;

+                }

+            }

+            state = stateStack.getLast();

+            stateStack.removeLast();

+            for (int j = dim - 1; j >= 0; j--) {

+                bounds[j] = boundsStack.getLast();

+                boundsStack.removeLast();

+            }

+            stepsize *= 2;

+            if (!zoomOut) {

+                state = stateStack.getLast();

+                stateStack.removeLast();

+                for (int j = dim - 1; j >= 0; j--) {

+                    bounds[j] = boundsStack.getLast();

+                    boundsStack.removeLast();

+                }

+                stepsize *= 2;

+                break;

+            }

+        }

+

+        while (true) {

+            stateStack.add(state);

+            for (int j = 0; j < dim; j++) {

+                boundsStack.add(bounds[j]);

+            }

+

+            // Find the quadrant in which A and B are

+            int quadrantA = 0, quadrantB = 0;

+            for (int i = dim - 1; i >= 0; i--) {

+                if (a[i] >= bounds[i])

+                    quadrantA ^= (1 << (dim - i - 1));

+                if (b[i] >= bounds[i])

+                    quadrantB ^= (1 << (dim - i - 1));

+

+                if (a[i] >= bounds[i]) {

+                    bounds[i] += stepsize;

+                } else {

+                    bounds[i] -= stepsize;

+                }

+            }

+

+            stepsize /= 2;

+            if (stepsize <= 2 * DoublePointable.getEpsilon())

+                return 0;

+            // avoid infinite loop due to machine epsilon problems

+

+            if (quadrantA != quadrantB) {

+                // find the position of A and B's quadrants

+                int posA = states[state].position[quadrantA];

+                int posB = states[state].position[quadrantB];

+

+                if (posA < posB)

+                    return -1;

+                else

+                    return 1;

+            }

+

+            state = states[state].nextState[quadrantA];

+        }

+    }

+

+    @Override

+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+        for (int i = 0; i < dim; i++) {

+            a[i] = DoubleSerializerDeserializer.getDouble(b1, s1 + (i * 8));

+            b[i] = DoubleSerializerDeserializer.getDouble(b2, s2 + (i * 8));

+        }

+

+        return compare();

+    }

+

+    @Override

+    public int getDimensions() {

+        return dim;

+    }

+}

diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/HilbertDoubleComparatorFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/HilbertDoubleComparatorFactory.java
new file mode 100644
index 0000000..e06dba8
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/HilbertDoubleComparatorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.rtree.linearize;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
+
+public class HilbertDoubleComparatorFactory implements ILinearizeComparatorFactory {
+    private static final long serialVersionUID = 1L;
+    
+    private int dim;
+
+    public static HilbertDoubleComparatorFactory get(int dim) {
+        return new HilbertDoubleComparatorFactory(dim);
+    }
+    
+    public HilbertDoubleComparatorFactory(int dim) {
+    	this.dim = dim;
+    }
+
+    @Override
+    public ILinearizeComparator createBinaryComparator() {
+        return new HilbertDoubleComparator(dim);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveDoubleComparator.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveDoubleComparator.java
new file mode 100644
index 0000000..ee47761
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveDoubleComparator.java
@@ -0,0 +1,136 @@
+package edu.uci.ics.hyracks.storage.am.rtree.linearize;

+

+import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparator;

+import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;

+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;

+import edu.uci.ics.hyracks.storage.am.common.ophelpers.DoubleArrayList;

+import edu.uci.ics.hyracks.storage.am.rtree.impls.DoublePrimitiveValueProviderFactory;

+

+/*

+ * This compares two points based on the z curve. For doubles, we cannot use

+ * the simple bit magic approach. There may, however, be a better approach than this.

+ */

+

+public class ZCurveDoubleComparator implements ILinearizeComparator {

+    private final int dim; // dimension

+

+    private double[] bounds;

+    private double stepsize;

+    private DoubleArrayList boundsStack = new DoubleArrayList(2000, 400);

+

+    private IPrimitiveValueProvider valueProvider = DoublePrimitiveValueProviderFactory.INSTANCE

+            .createPrimitiveValueProvider();

+

+    private double[] a;

+    private double[] b;

+

+    public ZCurveDoubleComparator(int dimension) {

+        dim = dimension;

+        a = new double[dim];

+        b = new double[dim];

+

+        resetStateMachine();

+    }

+

+    private void resetStateMachine() {

+        stepsize = Double.MAX_VALUE / 2;

+        bounds = new double[dim];

+        boundsStack.clear();

+    }

+

+    public int compare() {

+        boolean equal = true;

+        for (int i = 0; i < dim; i++) {

+            if (a[i] != b[i])

+                equal = false;

+        }

+        if (equal)

+            return 0;

+

+        // We keep the state of the state machine after a comparison. In most

+        // cases,

+        // the needed zoom factor is close to the old one. In this step, we

+        // check if we have

+        // to zoom out

+        while (true) {

+            if (boundsStack.size() <= dim) {

+                resetStateMachine();

+                break;

+            }

+            boolean zoomOut = false;

+            for (int i = 0; i < dim; i++) {

+                if (Math.min(a[i], b[i]) <= bounds[i] - 2 * stepsize

+                        || Math.max(a[i], b[i]) >= bounds[i] + 2 * stepsize) {

+                    zoomOut = true;

+                    break;

+                }

+            }

+

+            for (int j = dim - 1; j >= 0; j--) {

+                bounds[j] = boundsStack.getLast();

+                boundsStack.removeLast();

+            }

+            stepsize *= 2;

+            if (!zoomOut) {

+                for (int j = dim - 1; j >= 0; j--) {

+                    bounds[j] = boundsStack.getLast();

+                    boundsStack.removeLast();

+                }

+                stepsize *= 2;

+                break;

+            }

+        }

+

+        while (true) {

+            for (int j = 0; j < dim; j++) {

+                boundsStack.add(bounds[j]);

+            }

+

+            // Find the quadrant in which A and B are

+            int quadrantA = 0, quadrantB = 0;

+            for (int i = dim - 1; i >= 0; i--) {

+                if (a[i] >= bounds[i])

+                    quadrantA ^= (1 << (dim - i - 1));

+                if (b[i] >= bounds[i])

+                    quadrantB ^= (1 << (dim - i - 1));

+

+                if (a[i] >= bounds[i]) {

+                    bounds[i] += stepsize;

+                } else {

+                    bounds[i] -= stepsize;

+                }

+            }

+

+            stepsize /= 2;

+            if (stepsize <= 2 * DoublePointable.getEpsilon())

+                return 0;

+            // avoid infinite loop due to machine epsilon problems

+

+            if (quadrantA != quadrantB) {

+                // find the position of A and B's quadrants

+                if (quadrantA < quadrantB)

+                    return -1;

+                else if (quadrantA > quadrantB)

+                    return 1;

+                else

+                    return 0;

+            }

+        }

+    }

+

+    @Override

+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+        for (int i = 0; i < dim; i++) {

+            a[i] = DoubleSerializerDeserializer.getDouble(b1, s1 + (i * 8));

+            b[i] = DoubleSerializerDeserializer.getDouble(b2, s2 + (i * 8));

+        }

+

+        return compare();

+    }

+

+    @Override

+    public int getDimensions() {

+        return dim;

+    }

+}

diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveDoubleComparatorFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveDoubleComparatorFactory.java
new file mode 100644
index 0000000..f1b5806
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveDoubleComparatorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.rtree.linearize;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
+
+public class ZCurveDoubleComparatorFactory implements ILinearizeComparatorFactory {
+    private static final long serialVersionUID = 1L;
+    
+    private int dim;
+
+    public static ZCurveDoubleComparatorFactory get(int dim) {
+        return new ZCurveDoubleComparatorFactory(dim);
+    }
+    
+    public ZCurveDoubleComparatorFactory(int dim) {
+    	this.dim = dim;
+    }
+
+    @Override
+    public ILinearizeComparator createBinaryComparator() {
+        return new ZCurveDoubleComparator(dim);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveIntComparator.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveIntComparator.java
new file mode 100644
index 0000000..1f26f41
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveIntComparator.java
@@ -0,0 +1,129 @@
+package edu.uci.ics.hyracks.storage.am.rtree.linearize;

+

+import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparator;

+import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;

+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

+import edu.uci.ics.hyracks.storage.am.common.ophelpers.DoubleArrayList;

+

+/*

+ * This compares two points based on the z curve. For doubles, we cannot use

+ * the simple bit magic approach. There may, however, be a better approach than this.

+ */

+

+public class ZCurveIntComparator implements ILinearizeComparator {

+    private final int dim; // dimension

+

+    private double[] bounds;

+    private double stepsize;

+    private DoubleArrayList boundsStack = new DoubleArrayList(2000, 400);

+

+    private int[] a;

+    private int[] b;

+

+    public ZCurveIntComparator(int dimension) {

+        dim = dimension;

+        a = new int[dim];

+        b = new int[dim];

+

+        resetStateMachine();

+    }

+

+    private void resetStateMachine() {

+        stepsize = Integer.MAX_VALUE / 2;

+        bounds = new double[dim];

+        boundsStack.clear();

+    }

+

+    public int compare() {

+        boolean equal = true;

+        for (int i = 0; i < dim; i++) {

+            if (a[i] != b[i])

+                equal = false;

+        }

+        if (equal)

+            return 0;

+

+        // We keep the state of the state machine after a comparison. In most cases,

+        // the needed zoom factor is close to the old one. In this step, we check if we have

+        // to zoom out

+        while (true) {

+            if (boundsStack.size() <= dim) {

+                resetStateMachine();

+                break;

+            }

+            boolean zoomOut = false;

+            for (int i = 0; i < dim; i++) {

+                if (Math.min(a[i], b[i]) <= bounds[i] - 2 * stepsize

+                        || Math.max(a[i], b[i]) >= bounds[i] + 2 * stepsize) {

+                    zoomOut = true;

+                    break;

+                }

+            }

+

+            for (int j = dim - 1; j >= 0; j--) {

+                bounds[j] = boundsStack.getLast();

+                boundsStack.removeLast();

+            }

+            stepsize *= 2;

+            if (!zoomOut) {

+                for (int j = dim - 1; j >= 0; j--) {

+                    bounds[j] = boundsStack.getLast();

+                    boundsStack.removeLast();

+                }

+                stepsize *= 2;

+                break;

+            }

+        }

+

+        while (true) {

+            for (int j = 0; j < dim; j++) {

+                boundsStack.add(bounds[j]);

+            }

+

+            // Find the quadrant in which A and B are

+            int quadrantA = 0, quadrantB = 0;

+            for (int i = dim - 1; i >= 0; i--) {

+                if (a[i] >= bounds[i])

+                    quadrantA ^= (1 << (dim - i - 1));

+                if (b[i] >= bounds[i])

+                    quadrantB ^= (1 << (dim - i - 1));

+

+                if (a[i] >= bounds[i]) {

+                    bounds[i] += stepsize;

+                } else {

+                    bounds[i] -= stepsize;

+                }

+            }

+

+            stepsize /= 2;

+            if (stepsize <= 2 * DoublePointable.getEpsilon())

+                return 0;

+            // avoid infinite loop due to machine epsilon problems

+

+            if (quadrantA != quadrantB) {

+                // find the position of A and B's quadrants

+                if (quadrantA < quadrantB)

+                    return -1;

+                else if (quadrantA > quadrantB)

+                    return 1;

+                else

+                    return 0;

+            }

+        }

+    }

+

+    @Override

+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

+        for (int i = 0; i < dim; i++) {

+            a[i] = IntegerSerializerDeserializer.getInt(b1, s1 + (i * 8));

+            b[i] = IntegerSerializerDeserializer.getInt(b2, s2 + (i * 8));

+        }

+

+        return compare();

+    }

+

+    @Override

+    public int getDimensions() {

+        return dim;

+    }

+}

diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveIntComparatorFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveIntComparatorFactory.java
new file mode 100644
index 0000000..4a35a79
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/linearize/ZCurveIntComparatorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.rtree.linearize;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
+
+public class ZCurveIntComparatorFactory implements ILinearizeComparatorFactory {
+    private static final long serialVersionUID = 1L;
+    
+    private int dim;
+
+    public static ZCurveIntComparatorFactory get(int dim) {
+        return new ZCurveIntComparatorFactory(dim);
+    }
+    
+    public ZCurveIntComparatorFactory(int dim) {
+    	this.dim = dim;
+    }
+
+    @Override
+    public ILinearizeComparator createBinaryComparator() {
+        return new ZCurveIntComparator(dim);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/util/RTreeUtils.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/util/RTreeUtils.java
index 6f5d36f..5889abb 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/util/RTreeUtils.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/util/RTreeUtils.java
@@ -18,6 +18,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
@@ -30,24 +31,27 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
 import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 public class RTreeUtils {
-    public static RTree createRTree(IBufferCache bufferCache, ITypeTraits[] typeTraits,
-            IPrimitiveValueProviderFactory[] valueProviderFactories, IBinaryComparatorFactory[] cmpFactories) {
+    public static RTree createRTree(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+            ITypeTraits[] typeTraits, IPrimitiveValueProviderFactory[] valueProviderFactories,
+            IBinaryComparatorFactory[] cmpFactories, RTreePolicyType rtreePolicyType, FileReference file) {
 
         RTreeTypeAwareTupleWriterFactory tupleWriterFactory = new RTreeTypeAwareTupleWriterFactory(typeTraits);
         ITreeIndexFrameFactory interiorFrameFactory = new RTreeNSMInteriorFrameFactory(tupleWriterFactory,
-                valueProviderFactories);
+                valueProviderFactories, rtreePolicyType);
         ITreeIndexFrameFactory leafFrameFactory = new RTreeNSMLeafFrameFactory(tupleWriterFactory,
-                valueProviderFactories);
+                valueProviderFactories, rtreePolicyType);
         ITreeIndexMetaDataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
 
         IFreePageManager freePageManager = new LinkedListFreePageManager(bufferCache, 0, metaFrameFactory);
-        RTree rtree = new RTree(bufferCache, typeTraits.length, cmpFactories, freePageManager, interiorFrameFactory,
-                leafFrameFactory);
+        RTree rtree = new RTree(bufferCache, fileMapProvider, freePageManager, interiorFrameFactory, leafFrameFactory,
+                cmpFactories, typeTraits.length, file);
         return rtree;
     }