Copied hyracks trunk into fullstack

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1958 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-storage-am-rtree/pom.xml b/hyracks/hyracks-storage-am-rtree/pom.xml
new file mode 100644
index 0000000..4c49a95
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/pom.xml
@@ -0,0 +1,56 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks</groupId>
+  <artifactId>hyracks-storage-am-rtree</artifactId>
+  <version>0.2.2-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.2.2-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-storage-am-common</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>  	
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-common</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>  	
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-std</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>  	
+  	<dependency>
+  		<groupId>junit</groupId>
+  		<artifactId>junit</artifactId>
+  		<version>4.8.1</version>
+  		<type>jar</type>
+  		<scope>test</scope>
+  	</dependency>  	  		
+  </dependencies>
+</project>
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IGenericPrimitiveSerializerDeserializer.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IGenericPrimitiveSerializerDeserializer.java
new file mode 100644
index 0000000..fc3ae7c
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IGenericPrimitiveSerializerDeserializer.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.api;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+
+public interface IGenericPrimitiveSerializerDeserializer<T> extends
+		ISerializerDeserializer<T> {
+	public double getValue(byte[] bytes, int offset);
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
new file mode 100644
index 0000000..cad7981
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.api;
+
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public interface IRTreeFrame extends ITreeIndexFrame {
+
+    public void delete(int tupleIndex, MultiComparator cmp);
+
+    public long getPageNsn();
+
+    public void setPageNsn(long pageNsn);
+
+    public int getRightPage();
+
+    public void setRightPage(int rightPage);
+
+    public void adjustMBR();
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeInteriorFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeInteriorFrame.java
new file mode 100644
index 0000000..5f333f3
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeInteriorFrame.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.api;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.PathList;
+
+public interface IRTreeInteriorFrame extends IRTreeFrame {
+
+    public boolean findBestChild(ITupleReference tuple, MultiComparator cmp);
+
+    public int getBestChildPageId();
+
+    public int getChildPageId(int tupleIndex);
+
+    public int getChildPageIdIfIntersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
+
+    public int findTupleByPointer(ITupleReference tuple, MultiComparator cmp);
+
+    public int findTupleByPointer(ITupleReference tuple, PathList traverseList, int parentIndex, MultiComparator cmp);
+
+    public void adjustKey(ITupleReference tuple, int tupleIndex, MultiComparator cmp) throws TreeIndexException;
+
+    public boolean recomputeMBR(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
+
+    public void enlarge(ITupleReference tuple, MultiComparator cmp);
+
+    boolean checkEnlargement(ITupleReference tuple, MultiComparator cmp);
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeLeafFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeLeafFrame.java
new file mode 100644
index 0000000..3005785
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeLeafFrame.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.api;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public interface IRTreeLeafFrame extends IRTreeFrame {
+
+	public int findTupleIndex(ITupleReference tuple, MultiComparator cmp);
+
+	public boolean intersect(ITupleReference tuple, int tupleIndex,
+			MultiComparator cmp);
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
new file mode 100644
index 0000000..0470da9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
+
+public class RTreeDataflowHelper extends TreeIndexDataflowHelper {
+
+    private final IPrimitiveValueProviderFactory[] valueProviderFactories;
+
+    public RTreeDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+            IPrimitiveValueProviderFactory[] valueProviderFactories) {
+        super(opDesc, ctx, partition);
+        this.valueProviderFactories = valueProviderFactories;
+    }
+
+    @Override
+    public ITreeIndex createIndexInstance() throws HyracksDataException {
+        return RTreeUtils.createRTree(treeOpDesc.getStorageManager().getBufferCache(ctx),
+                treeOpDesc.getTreeIndexTypeTraits(), valueProviderFactories,
+                treeOpDesc.getTreeIndexComparatorFactories());
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java
new file mode 100644
index 0000000..6b9fd4c
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelperFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+
+public class RTreeDataflowHelperFactory implements IIndexDataflowHelperFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IPrimitiveValueProviderFactory[] valueProviderFactories;
+
+    public RTreeDataflowHelperFactory(IPrimitiveValueProviderFactory[] valueProviderFactories) {
+        this.valueProviderFactories = valueProviderFactories;
+    }
+
+    @Override
+    public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition) {
+        return new RTreeDataflowHelper(opDesc, ctx, partition, valueProviderFactories);
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
new file mode 100644
index 0000000..9818dce
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class RTreeSearchOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    protected int[] keyFields; // fields in input tuple to be used as keys
+
+    public RTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, int[] keyFields,
+            IIndexDataflowHelperFactory dataflowHelperFactory, boolean retainInput, IOperationCallbackProvider opCallbackProvider) {
+        super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, dataflowHelperFactory, null, false, opCallbackProvider);
+        this.keyFields = keyFields;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new RTreeSearchOperatorNodePushable(this, ctx, opCallbackProvider, partition, recordDescProvider,
+                keyFields);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
new file mode 100644
index 0000000..3781037
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexSearchOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
+import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
+
+public class RTreeSearchOperatorNodePushable extends TreeIndexSearchOperatorNodePushable {
+    protected PermutingFrameTupleReference searchKey;
+    protected MultiComparator cmp;
+
+    public RTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            IOperationCallbackProvider opCallbackProvider, int partition, IRecordDescriptorProvider recordDescProvider,
+            int[] keyFields) {
+        super(opDesc, ctx, partition, recordDescProvider);
+        if (keyFields != null && keyFields.length > 0) {
+            searchKey = new PermutingFrameTupleReference();
+            searchKey.setFieldPermutation(keyFields);
+        }
+    }
+
+    @Override
+    protected ISearchPredicate createSearchPredicate() {
+        cmp = RTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), searchKey);
+        return new SearchPredicate(searchKey, cmp);
+    }
+
+    @Override
+    protected void resetSearchPredicate(int tupleIndex) {
+        if (searchKey != null) {
+            searchKey.reset(accessor, tupleIndex);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java
new file mode 100644
index 0000000..84e66ef
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java
@@ -0,0 +1,333 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.frames.TreeIndexNSMFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.EntriesOrder;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSplitKey;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.Rectangle;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.TupleEntryArrayList;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.UnorderedSlotManager;
+import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
+
+public abstract class RTreeNSMFrame extends TreeIndexNSMFrame implements IRTreeFrame {
+    protected static final int pageNsnOff = smFlagOff + 1;
+    protected static final int rightPageOff = pageNsnOff + 8;
+
+    protected ITreeIndexTupleReference[] tuples;
+    protected ITreeIndexTupleReference cmpFrameTuple;
+    protected TupleEntryArrayList tupleEntries1; // used for split and checking
+                                                 // enlargement
+    protected TupleEntryArrayList tupleEntries2; // used for split
+
+    protected Rectangle[] rec;
+
+    protected static final double splitFactor = 0.4;
+    protected static final int nearMinimumOverlapFactor = 32;
+    private static final double doubleEpsilon = computeDoubleEpsilon();
+    private static final int numTuplesEntries = 100;
+    protected final IPrimitiveValueProvider[] keyValueProviders;
+
+    public RTreeNSMFrame(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders) {
+        super(tupleWriter, new UnorderedSlotManager());
+        this.tuples = new ITreeIndexTupleReference[keyValueProviders.length];
+        for (int i = 0; i < keyValueProviders.length; i++) {
+            this.tuples[i] = tupleWriter.createTupleReference();
+        }
+        cmpFrameTuple = tupleWriter.createTupleReference();
+
+        tupleEntries1 = new TupleEntryArrayList(numTuplesEntries, numTuplesEntries);
+        tupleEntries2 = new TupleEntryArrayList(numTuplesEntries, numTuplesEntries);
+        rec = new Rectangle[4];
+        for (int i = 0; i < 4; i++) {
+            rec[i] = new Rectangle(keyValueProviders.length / 2);
+        }
+        this.keyValueProviders = keyValueProviders;
+    }
+
+    private static double computeDoubleEpsilon() {
+        double doubleEpsilon = 1.0;
+
+        do {
+            doubleEpsilon /= 2.0;
+        } while (1.0 + (doubleEpsilon / 2.0) != 1.0);
+        return doubleEpsilon;
+    }
+
+    public static double doubleEpsilon() {
+        return doubleEpsilon;
+    }
+
+    @Override
+    public void initBuffer(byte level) {
+        super.initBuffer(level);
+        buf.putLong(pageNsnOff, 0);
+        buf.putInt(rightPageOff, -1);
+    }
+
+    public void setTupleCount(int tupleCount) {
+        buf.putInt(tupleCountOff, tupleCount);
+    }
+
+    @Override
+    public void setPageNsn(long pageNsn) {
+        buf.putLong(pageNsnOff, pageNsn);
+    }
+
+    @Override
+    public long getPageNsn() {
+        return buf.getLong(pageNsnOff);
+    }
+
+    @Override
+    protected void resetSpaceParams() {
+        buf.putInt(freeSpaceOff, rightPageOff + 4);
+        buf.putInt(totalFreeSpaceOff, buf.capacity() - (rightPageOff + 4));
+    }
+
+    @Override
+    public int getRightPage() {
+        return buf.getInt(rightPageOff);
+    }
+
+    @Override
+    public void setRightPage(int rightPage) {
+        buf.putInt(rightPageOff, rightPage);
+    }
+
+    protected ITreeIndexTupleReference[] getTuples() {
+        return tuples;
+    }
+
+    @Override
+    public void split(ITreeIndexFrame rightFrame, ITupleReference tuple, ISplitKey splitKey) throws TreeIndexException {
+        RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame = ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
+
+        // calculations are based on the R*-tree paper
+        int m = (int) Math.floor((getTupleCount() + 1) * splitFactor);
+        int splitDistribution = getTupleCount() - (2 * m) + 2;
+
+        // to calculate the minimum margin in order to pick the split axis
+        double minMargin = Double.MAX_VALUE;
+        int splitAxis = 0, sortOrder = 0;
+
+        int maxFieldPos = keyValueProviders.length / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            for (int k = 0; k < getTupleCount(); ++k) {
+
+                frameTuple.resetByTupleIndex(this, k);
+                double LowerKey = keyValueProviders[i]
+                        .getValue(frameTuple.getFieldData(i), frameTuple.getFieldStart(i));
+                double UpperKey = keyValueProviders[j]
+                        .getValue(frameTuple.getFieldData(j), frameTuple.getFieldStart(j));
+
+                tupleEntries1.add(k, LowerKey);
+                tupleEntries2.add(k, UpperKey);
+            }
+            double LowerKey = keyValueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i));
+            double UpperKey = keyValueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j));
+
+            tupleEntries1.add(-1, LowerKey);
+            tupleEntries2.add(-1, UpperKey);
+
+            tupleEntries1.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
+            tupleEntries2.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
+
+            double lowerMargin = 0.0, upperMargin = 0.0;
+            // generate distribution
+            for (int k = 1; k <= splitDistribution; ++k) {
+                int d = m - 1 + k;
+
+                generateDist(tuple, tupleEntries1, rec[0], 0, d);
+                generateDist(tuple, tupleEntries2, rec[1], 0, d);
+                generateDist(tuple, tupleEntries1, rec[2], d, getTupleCount() + 1);
+                generateDist(tuple, tupleEntries2, rec[3], d, getTupleCount() + 1);
+
+                // calculate the margin of the distributions
+                lowerMargin += rec[0].margin() + rec[2].margin();
+                upperMargin += rec[1].margin() + rec[3].margin();
+            }
+            double margin = Math.min(lowerMargin, upperMargin);
+
+            // store minimum margin as split axis
+            if (margin < minMargin) {
+                minMargin = margin;
+                splitAxis = i;
+                sortOrder = (lowerMargin < upperMargin) ? 0 : 2;
+            }
+
+            tupleEntries1.clear();
+            tupleEntries2.clear();
+        }
+
+        for (int i = 0; i < getTupleCount(); ++i) {
+            frameTuple.resetByTupleIndex(this, i);
+            double key = keyValueProviders[splitAxis + sortOrder].getValue(
+                    frameTuple.getFieldData(splitAxis + sortOrder), frameTuple.getFieldStart(splitAxis + sortOrder));
+            tupleEntries1.add(i, key);
+        }
+        double key = keyValueProviders[splitAxis + sortOrder].getValue(tuple.getFieldData(splitAxis + sortOrder),
+                tuple.getFieldStart(splitAxis + sortOrder));
+        tupleEntries1.add(-1, key);
+        tupleEntries1.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
+
+        double minArea = Double.MAX_VALUE;
+        double minOverlap = Double.MAX_VALUE;
+        int splitPoint = 0;
+        for (int i = 1; i <= splitDistribution; ++i) {
+            int d = m - 1 + i;
+
+            generateDist(tuple, tupleEntries1, rec[0], 0, d);
+            generateDist(tuple, tupleEntries1, rec[2], d, getTupleCount() + 1);
+
+            double overlap = rec[0].overlappedArea(rec[2]);
+            if (overlap < minOverlap) {
+                splitPoint = d;
+                minOverlap = overlap;
+                minArea = rec[0].area() + rec[2].area();
+            } else if (overlap == minOverlap) {
+                double area = rec[0].area() + rec[2].area();
+                if (area < minArea) {
+                    splitPoint = d;
+                    minArea = area;
+                }
+            }
+        }
+        int startIndex, endIndex;
+        if (splitPoint < (getTupleCount() + 1) / 2) {
+            startIndex = 0;
+            endIndex = splitPoint;
+        } else {
+            startIndex = splitPoint;
+            endIndex = (getTupleCount() + 1);
+        }
+        boolean tupleInserted = false;
+        int totalBytes = 0, numOfDeletedTuples = 0;
+        for (int i = startIndex; i < endIndex; i++) {
+            if (tupleEntries1.get(i).getTupleIndex() != -1) {
+                frameTuple.resetByTupleIndex(this, tupleEntries1.get(i).getTupleIndex());
+                rightFrame.insert(frameTuple, -1);
+                ((UnorderedSlotManager) slotManager).modifySlot(
+                        slotManager.getSlotOff(tupleEntries1.get(i).getTupleIndex()), -1);
+                totalBytes += getTupleSize(frameTuple);
+                numOfDeletedTuples++;
+            } else {
+                rightFrame.insert(tuple, -1);
+                tupleInserted = true;
+            }
+        }
+
+        ((UnorderedSlotManager) slotManager).deleteEmptySlots();
+
+        // maintain space information
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + totalBytes
+                + (slotManager.getSlotSize() * numOfDeletedTuples));
+
+        // compact both pages
+        rightFrame.compact();
+        compact();
+
+        if (!tupleInserted) {
+            insert(tuple, -1);
+        }
+
+        int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
+        frameTuple.resetByTupleOffset(buf, tupleOff);
+        int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0, keyValueProviders.length);
+
+        splitKey.initData(splitKeySize);
+        this.adjustMBR();
+        rTreeTupleWriterLeftFrame.writeTupleFields(getTuples(), 0, rTreeSplitKey.getLeftPageBuffer(), 0);
+        rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(), 0);
+
+        ((IRTreeFrame) rightFrame).adjustMBR();
+        rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getTuples(), 0,
+                rTreeSplitKey.getRightPageBuffer(), 0);
+        rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(), 0);
+
+        tupleEntries1.clear();
+        tupleEntries2.clear();
+    }
+
+    abstract public int getTupleSize(ITupleReference tuple);
+
+    public void generateDist(ITupleReference tuple, TupleEntryArrayList entries, Rectangle rec, int start, int end) {
+        int j = 0;
+        while (entries.get(j).getTupleIndex() == -1) {
+            j++;
+        }
+        frameTuple.resetByTupleIndex(this, entries.get(j).getTupleIndex());
+        rec.set(frameTuple, keyValueProviders);
+        for (int i = start; i < end; ++i) {
+            if (i != j) {
+                if (entries.get(i).getTupleIndex() != -1) {
+                    frameTuple.resetByTupleIndex(this, entries.get(i).getTupleIndex());
+                    rec.enlarge(frameTuple, keyValueProviders);
+                } else {
+                    rec.enlarge(tuple, keyValueProviders);
+                }
+            }
+        }
+    }
+
+    public void adjustMBRImpl(ITreeIndexTupleReference[] tuples) {
+        int maxFieldPos = keyValueProviders.length / 2;
+        for (int i = 1; i < getTupleCount(); i++) {
+            frameTuple.resetByTupleIndex(this, i);
+            for (int j = 0; j < maxFieldPos; j++) {
+                int k = maxFieldPos + j;
+                double valA = keyValueProviders[j].getValue(frameTuple.getFieldData(j), frameTuple.getFieldStart(j));
+                double valB = keyValueProviders[j].getValue(tuples[j].getFieldData(j), tuples[j].getFieldStart(j));
+                if (valA < valB) {
+                    tuples[j].resetByTupleIndex(this, i);
+                }
+                valA = keyValueProviders[k].getValue(frameTuple.getFieldData(k), frameTuple.getFieldStart(k));
+                valB = keyValueProviders[k].getValue(tuples[k].getFieldData(k), tuples[k].getFieldStart(k));
+                if (valA > valB) {
+                    tuples[k].resetByTupleIndex(this, i);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void adjustMBR() {
+        for (int i = 0; i < tuples.length; i++) {
+            tuples[i].setFieldCount(getFieldCount());
+            tuples[i].resetByTupleIndex(this, 0);
+        }
+
+        adjustMBRImpl(tuples);
+    }
+
+    public abstract int getFieldCount();
+
+    @Override
+    public int getPageHeaderSize() {
+        return rightPageOff;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
new file mode 100644
index 0000000..63387ef
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
@@ -0,0 +1,512 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.SlotOffTupleOff;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.EntriesOrder;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.PathList;
+
+public class RTreeNSMInteriorFrame extends RTreeNSMFrame implements IRTreeInteriorFrame {
+
+    private static final int childPtrSize = 4;
+    private IBinaryComparator childPtrCmp = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY)
+            .createBinaryComparator();
+    private final int keyFieldCount;
+
+    public RTreeNSMInteriorFrame(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders) {
+        super(tupleWriter, keyValueProviders);
+        keyFieldCount = keyValueProviders.length;
+        frameTuple.setFieldCount(keyFieldCount);
+    }
+
+    @Override
+    public boolean findBestChild(ITupleReference tuple, MultiComparator cmp) {
+        cmpFrameTuple.setFieldCount(cmp.getKeyFieldCount());
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+
+        int bestChild = 0;
+        double minEnlargedArea = Double.MAX_VALUE;
+
+        // the children pointers in the node point to leaves
+        if (getLevel() == 1) {
+            // find least overlap enlargement, use minimum enlarged area to
+            // break tie, if tie still exists use minimum area to break it
+            for (int i = 0; i < getTupleCount(); ++i) {
+                frameTuple.resetByTupleIndex(this, i);
+                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
+                tupleEntries1.add(i, enlargedArea);
+                if (enlargedArea < minEnlargedArea) {
+                    minEnlargedArea = enlargedArea;
+                    bestChild = i;
+                }
+            }
+            if (minEnlargedArea < RTreeNSMFrame.doubleEpsilon() || minEnlargedArea > RTreeNSMFrame.doubleEpsilon()) {
+                minEnlargedArea = Double.MAX_VALUE;
+                int k;
+                if (getTupleCount() > nearMinimumOverlapFactor) {
+                    // sort the entries based on their area enlargement needed
+                    // to include the object
+                    tupleEntries1.sort(EntriesOrder.ASCENDING, getTupleCount());
+                    k = nearMinimumOverlapFactor;
+                } else {
+                    k = getTupleCount();
+                }
+
+                double minOverlap = Double.MAX_VALUE;
+                int id = 0;
+                for (int i = 0; i < k; ++i) {
+                    double difference = 0.0;
+                    for (int j = 0; j < getTupleCount(); ++j) {
+                        frameTuple.resetByTupleIndex(this, j);
+                        cmpFrameTuple.resetByTupleIndex(this, tupleEntries1.get(i).getTupleIndex());
+
+                        int c = pointerCmp(frameTuple, cmpFrameTuple, cmp);
+                        if (c != 0) {
+                            double intersection = overlappedArea(frameTuple, tuple, cmpFrameTuple, cmp);
+                            if (intersection != 0.0) {
+                                difference += intersection - overlappedArea(frameTuple, null, cmpFrameTuple, cmp);
+                            }
+                        } else {
+                            id = j;
+                        }
+                    }
+
+                    double enlargedArea = enlargedArea(cmpFrameTuple, tuple, cmp);
+                    if (difference < minOverlap) {
+                        minOverlap = difference;
+                        minEnlargedArea = enlargedArea;
+                        bestChild = id;
+                    } else if (difference == minOverlap) {
+                        if (enlargedArea < minEnlargedArea) {
+                            minEnlargedArea = enlargedArea;
+                            bestChild = id;
+                        } else if (enlargedArea == minEnlargedArea) {
+                            double area = area(cmpFrameTuple, cmp);
+                            frameTuple.resetByTupleIndex(this, bestChild);
+                            double minArea = area(frameTuple, cmp);
+                            if (area < minArea) {
+                                bestChild = id;
+                            }
+                        }
+                    }
+                }
+            }
+        } else { // find minimum enlarged area, use minimum area to break tie
+            for (int i = 0; i < getTupleCount(); i++) {
+                frameTuple.resetByTupleIndex(this, i);
+                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
+                if (enlargedArea < minEnlargedArea) {
+                    minEnlargedArea = enlargedArea;
+                    bestChild = i;
+                } else if (enlargedArea == minEnlargedArea) {
+                    double area = area(frameTuple, cmp);
+                    frameTuple.resetByTupleIndex(this, bestChild);
+                    double minArea = area(frameTuple, cmp);
+                    if (area < minArea) {
+                        bestChild = i;
+                    }
+                }
+            }
+        }
+        tupleEntries1.clear();
+
+        frameTuple.resetByTupleIndex(this, bestChild);
+        if (minEnlargedArea > 0.0) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public ITreeIndexTupleReference createTupleReference() {
+        ITreeIndexTupleReference tuple = tupleWriter.createTupleReference();
+        tuple.setFieldCount(keyFieldCount);
+        return tuple;
+    }
+
+    @Override
+    public int getBestChildPageId() {
+        return buf.getInt(getChildPointerOff(frameTuple));
+    }
+
+    @Override
+    public int findTupleByPointer(ITupleReference tuple, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        for (int i = 0; i < getTupleCount(); i++) {
+            frameTuple.resetByTupleIndex(this, i);
+            int c = pointerCmp(frameTuple, tuple, cmp);
+            if (c == 0) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public int getChildPageId(int tupleIndex) {
+        frameTuple.resetByTupleIndex(this, tupleIndex);
+        return buf.getInt(getChildPointerOff(frameTuple));
+    }
+
+    @Override
+    public int getChildPageIdIfIntersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        frameTuple.resetByTupleIndex(this, tupleIndex);
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i), frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j));
+            if (c > 0) {
+                return -1;
+            }
+            c = cmp.getComparators()[i].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
+                    frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+            if (c < 0) {
+                return -1;
+            }
+        }
+        return buf.getInt(getChildPointerOff(frameTuple));
+    }
+
+    @Override
+    public int findTupleByPointer(ITupleReference tuple, PathList traverseList, int parentIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        for (int i = 0; i < getTupleCount(); i++) {
+            frameTuple.resetByTupleIndex(this, i);
+
+            int c = pointerCmp(frameTuple, tuple, cmp);
+            if (c == 0) {
+                return i;
+            } else {
+                int pageId = IntegerSerializerDeserializer.getInt(frameTuple.getFieldData(cmp.getKeyFieldCount() - 1),
+                        getChildPointerOff(frameTuple));
+                traverseList.add(pageId, -1, parentIndex);
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public boolean compact() {
+        resetSpaceParams();
+
+        int tupleCount = buf.getInt(tupleCountOff);
+        int freeSpace = buf.getInt(freeSpaceOff);
+
+        ArrayList<SlotOffTupleOff> sortedTupleOffs = new ArrayList<SlotOffTupleOff>();
+        sortedTupleOffs.ensureCapacity(tupleCount);
+        for (int i = 0; i < tupleCount; i++) {
+            int slotOff = slotManager.getSlotOff(i);
+            int tupleOff = slotManager.getTupleOff(slotOff);
+            sortedTupleOffs.add(new SlotOffTupleOff(i, slotOff, tupleOff));
+        }
+        Collections.sort(sortedTupleOffs);
+
+        for (int i = 0; i < sortedTupleOffs.size(); i++) {
+            int tupleOff = sortedTupleOffs.get(i).tupleOff;
+            frameTuple.resetByTupleOffset(buf, tupleOff);
+
+            int tupleEndOff = frameTuple.getFieldStart(frameTuple.getFieldCount() - 1)
+                    + frameTuple.getFieldLength(frameTuple.getFieldCount() - 1);
+            int tupleLength = tupleEndOff - tupleOff + childPtrSize;
+            System.arraycopy(buf.array(), tupleOff, buf.array(), freeSpace, tupleLength);
+
+            slotManager.setSlot(sortedTupleOffs.get(i).slotOff, freeSpace);
+            freeSpace += tupleLength;
+        }
+
+        buf.putInt(freeSpaceOff, freeSpace);
+        buf.putInt(totalFreeSpaceOff, buf.capacity() - freeSpace - tupleCount * slotManager.getSlotSize());
+
+        return false;
+    }
+
+    @Override
+    public FrameOpSpaceStatus hasSpaceInsert(ITupleReference tuple) {
+        int bytesRequired = tupleWriter.bytesRequired(tuple) + childPtrSize;
+        if (bytesRequired + slotManager.getSlotSize() <= buf.capacity() - buf.getInt(freeSpaceOff)
+                - (buf.getInt(tupleCountOff) * slotManager.getSlotSize()))
+            return FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE;
+        else if (bytesRequired + slotManager.getSlotSize() <= buf.getInt(totalFreeSpaceOff))
+            return FrameOpSpaceStatus.SUFFICIENT_SPACE;
+        else
+            return FrameOpSpaceStatus.INSUFFICIENT_SPACE;
+    }
+
+    @Override
+    public void adjustKey(ITupleReference tuple, int tupleIndex, MultiComparator cmp) throws TreeIndexException {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        if (tupleIndex == -1) {
+            tupleIndex = findTupleByPointer(tuple, cmp);
+        }
+        if (tupleIndex != -1) {
+            tupleWriter.writeTuple(tuple, buf.array(), getTupleOffset(tupleIndex));
+        } else {
+            throw new TreeIndexException("Error: Faild to find a tuple in a page");
+
+        }
+
+    }
+
+    private int pointerCmp(ITupleReference tupleA, ITupleReference tupleB, MultiComparator cmp) {
+        return childPtrCmp
+                .compare(tupleA.getFieldData(cmp.getKeyFieldCount() - 1), getChildPointerOff(tupleA), childPtrSize,
+                        tupleB.getFieldData(cmp.getKeyFieldCount() - 1), getChildPointerOff(tupleB), childPtrSize);
+    }
+
+    public int getTupleSize(ITupleReference tuple) {
+        return tupleWriter.bytesRequired(tuple) + childPtrSize;
+    }
+
+    private int getChildPointerOff(ITupleReference tuple) {
+        return tuple.getFieldStart(tuple.getFieldCount() - 1) + tuple.getFieldLength(tuple.getFieldCount() - 1);
+    }
+
+    @Override
+    public void insert(ITupleReference tuple, int tupleIndex) {
+        frameTuple.setFieldCount(tuple.getFieldCount());
+        slotManager.insertSlot(-1, buf.getInt(freeSpaceOff));
+        int freeSpace = buf.getInt(freeSpaceOff);
+        int bytesWritten = tupleWriter.writeTupleFields(tuple, 0, tuple.getFieldCount(), buf.array(), freeSpace);
+        System.arraycopy(tuple.getFieldData(tuple.getFieldCount() - 1), getChildPointerOff(tuple), buf.array(),
+                freeSpace + bytesWritten, childPtrSize);
+        int tupleSize = bytesWritten + childPtrSize;
+
+        buf.putInt(tupleCountOff, buf.getInt(tupleCountOff) + 1);
+        buf.putInt(freeSpaceOff, buf.getInt(freeSpaceOff) + tupleSize);
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) - tupleSize - slotManager.getSlotSize());
+
+    }
+
+    @Override
+    public void delete(int tupleIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        int slotOff = slotManager.getSlotOff(tupleIndex);
+
+        int tupleOff = slotManager.getTupleOff(slotOff);
+        frameTuple.resetByTupleOffset(buf, tupleOff);
+        int tupleSize = tupleWriter.bytesRequired(frameTuple);
+
+        // perform deletion (we just do a memcpy to overwrite the slot)
+        int slotStartOff = slotManager.getSlotEndOff();
+        int length = slotOff - slotStartOff;
+        System.arraycopy(buf.array(), slotStartOff, buf.array(), slotStartOff + slotManager.getSlotSize(), length);
+
+        // maintain space information
+        buf.putInt(tupleCountOff, buf.getInt(tupleCountOff) - 1);
+        buf.putInt(totalFreeSpaceOff,
+                buf.getInt(totalFreeSpaceOff) + tupleSize + childPtrSize + slotManager.getSlotSize());
+    }
+
+    @Override
+    public boolean recomputeMBR(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        frameTuple.resetByTupleIndex(this, tupleIndex);
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+                    frameTuple.getFieldLength(i), tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i));
+            if (c != 0) {
+                return true;
+            }
+            c = cmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j), tuple.getFieldData(j), tuple.getFieldStart(j),
+                    tuple.getFieldLength(j));
+
+            if (c != 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private double overlappedArea(ITupleReference tuple1, ITupleReference tupleToBeInserted, ITupleReference tuple2,
+            MultiComparator cmp) {
+        double area = 1.0;
+        double f1, f2;
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            double pHigh1, pLow1;
+            if (tupleToBeInserted != null) {
+                int c = cmp.getComparators()[i].compare(tuple1.getFieldData(i), tuple1.getFieldStart(i),
+                        tuple1.getFieldLength(i), tupleToBeInserted.getFieldData(i),
+                        tupleToBeInserted.getFieldStart(i), tupleToBeInserted.getFieldLength(i));
+                if (c < 0) {
+                    pLow1 = keyValueProviders[i].getValue(tuple1.getFieldData(i), tuple1.getFieldStart(i));
+                } else {
+                    pLow1 = keyValueProviders[i].getValue(tupleToBeInserted.getFieldData(i),
+                            tupleToBeInserted.getFieldStart(i));
+                }
+
+                c = cmp.getComparators()[j].compare(tuple1.getFieldData(j), tuple1.getFieldStart(j),
+                        tuple1.getFieldLength(j), tupleToBeInserted.getFieldData(j),
+                        tupleToBeInserted.getFieldStart(j), tupleToBeInserted.getFieldLength(j));
+                if (c > 0) {
+                    pHigh1 = keyValueProviders[j].getValue(tuple1.getFieldData(j), tuple1.getFieldStart(j));
+                } else {
+                    pHigh1 = keyValueProviders[j].getValue(tupleToBeInserted.getFieldData(j),
+                            tupleToBeInserted.getFieldStart(j));
+                }
+            } else {
+                pLow1 = keyValueProviders[i].getValue(tuple1.getFieldData(i), tuple1.getFieldStart(i));
+                pHigh1 = keyValueProviders[j].getValue(tuple1.getFieldData(j), tuple1.getFieldStart(j));
+            }
+
+            double pLow2 = keyValueProviders[i].getValue(tuple2.getFieldData(i), tuple2.getFieldStart(i));
+            double pHigh2 = keyValueProviders[j].getValue(tuple2.getFieldData(j), tuple2.getFieldStart(j));
+
+            if (pLow1 > pHigh2 || pHigh1 < pLow2) {
+                return 0.0;
+            }
+
+            f1 = Math.max(pLow1, pLow2);
+            f2 = Math.min(pHigh1, pHigh2);
+            area *= f2 - f1;
+        }
+        return area;
+    }
+
+    private double enlargedArea(ITupleReference tuple, ITupleReference tupleToBeInserted, MultiComparator cmp) {
+        double areaBeforeEnlarge = area(tuple, cmp);
+        double areaAfterEnlarge = 1.0;
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            double pHigh, pLow;
+            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i), tupleToBeInserted.getFieldData(i), tupleToBeInserted.getFieldStart(i),
+                    tupleToBeInserted.getFieldLength(i));
+            if (c < 0) {
+                pLow = keyValueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i));
+            } else {
+                pLow = keyValueProviders[i].getValue(tupleToBeInserted.getFieldData(i),
+                        tupleToBeInserted.getFieldStart(i));
+            }
+
+            c = cmp.getComparators()[j].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
+                    tupleToBeInserted.getFieldData(j), tupleToBeInserted.getFieldStart(j),
+                    tupleToBeInserted.getFieldLength(j));
+            if (c > 0) {
+                pHigh = keyValueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j));
+            } else {
+                pHigh = keyValueProviders[j].getValue(tupleToBeInserted.getFieldData(j),
+                        tupleToBeInserted.getFieldStart(j));
+            }
+            areaAfterEnlarge *= pHigh - pLow;
+        }
+        return areaAfterEnlarge - areaBeforeEnlarge;
+    }
+
+    private double area(ITupleReference tuple, MultiComparator cmp) {
+        double area = 1.0;
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            area *= keyValueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j))
+                    - keyValueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i));
+        }
+        return area;
+    }
+
+    @Override
+    public boolean checkEnlargement(ITupleReference tuple, MultiComparator cmp) {
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+                    frameTuple.getFieldLength(i), tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i));
+            if (c > 0) {
+                return true;
+            }
+            c = cmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j), tuple.getFieldData(j), tuple.getFieldStart(j),
+                    tuple.getFieldLength(j));
+            if (c < 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void enlarge(ITupleReference tuple, MultiComparator cmp) {
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+                    frameTuple.getFieldLength(i), tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i));
+            if (c > 0) {
+                System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), frameTuple.getFieldData(i),
+                        frameTuple.getFieldStart(i), tuple.getFieldLength(i));
+            }
+            c = cmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j), tuple.getFieldData(j), tuple.getFieldStart(j),
+                    tuple.getFieldLength(j));
+            if (c < 0) {
+                System.arraycopy(tuple.getFieldData(j), tuple.getFieldStart(j), frameTuple.getFieldData(j),
+                        frameTuple.getFieldStart(j), tuple.getFieldLength(j));
+            }
+        }
+    }
+
+    // For debugging.
+    public ArrayList<Integer> getChildren(MultiComparator cmp) {
+        ArrayList<Integer> ret = new ArrayList<Integer>();
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        int tupleCount = buf.getInt(tupleCountOff);
+        for (int i = 0; i < tupleCount; i++) {
+            int tupleOff = slotManager.getTupleOff(slotManager.getSlotOff(i));
+            frameTuple.resetByTupleOffset(buf, tupleOff);
+            int intVal = IntegerSerializerDeserializer.getInt(
+                    buf.array(),
+                    frameTuple.getFieldStart(frameTuple.getFieldCount() - 1)
+                            + frameTuple.getFieldLength(frameTuple.getFieldCount() - 1));
+            ret.add(intVal);
+        }
+        return ret;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return keyValueProviders.length;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrameFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrameFactory.java
new file mode 100644
index 0000000..943a179
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrameFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+
+public class RTreeNSMInteriorFrameFactory implements ITreeIndexFrameFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final ITreeIndexTupleWriterFactory tupleWriterFactory;
+    private final IPrimitiveValueProviderFactory[] keyValueProviderFactories;
+
+    public RTreeNSMInteriorFrameFactory(ITreeIndexTupleWriterFactory tupleWriterFactory, IPrimitiveValueProviderFactory[] keyValueProviderFactories) {
+        this.tupleWriterFactory = tupleWriterFactory;
+        if (keyValueProviderFactories.length % 2 != 0) {
+            throw new IllegalArgumentException("The key has different number of dimensions.");
+        }
+        this.keyValueProviderFactories = keyValueProviderFactories;
+    }
+
+    @Override
+    public IRTreeInteriorFrame createFrame() {
+        IPrimitiveValueProvider[] keyValueProviders = new IPrimitiveValueProvider[keyValueProviderFactories.length];
+        for (int i = 0; i < keyValueProviders.length; i++) {
+            keyValueProviders[i] = keyValueProviderFactories[i].createPrimitiveValueProvider();
+        }
+        return new RTreeNSMInteriorFrame(tupleWriterFactory.createTupleWriter(), keyValueProviders);
+    }
+
+	@Override
+	public ITreeIndexTupleWriterFactory getTupleWriterFactory() {
+		return tupleWriterFactory;
+	}
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
new file mode 100644
index 0000000..f1d71ff
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+
+public class RTreeNSMLeafFrame extends RTreeNSMFrame implements IRTreeLeafFrame {
+
+    public RTreeNSMLeafFrame(ITreeIndexTupleWriter tupleWriter, IPrimitiveValueProvider[] keyValueProviders) {
+        super(tupleWriter, keyValueProviders);
+    }
+
+    @Override
+    public ITreeIndexTupleReference createTupleReference() {
+        return tupleWriter.createTupleReference();
+    }
+
+    @Override
+    public int findTupleIndex(ITupleReference tuple, MultiComparator cmp) {
+        return slotManager.findTupleIndex(tuple, frameTuple, cmp, null, null);
+    }
+
+    @Override
+    public boolean intersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
+        frameTuple.resetByTupleIndex(this, tupleIndex);
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i), frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j));
+            if (c > 0) {
+                return false;
+            }
+            c = cmp.getComparators()[i].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
+                    frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+
+            if (c < 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public int getTupleSize(ITupleReference tuple) {
+        return tupleWriter.bytesRequired(tuple);
+    }
+
+    @Override
+    public void insert(ITupleReference tuple, int tupleIndex) {
+        slotManager.insertSlot(-1, buf.getInt(freeSpaceOff));
+        int bytesWritten = tupleWriter.writeTuple(tuple, buf.array(), buf.getInt(freeSpaceOff));
+
+        buf.putInt(tupleCountOff, buf.getInt(tupleCountOff) + 1);
+        buf.putInt(freeSpaceOff, buf.getInt(freeSpaceOff) + bytesWritten);
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) - bytesWritten - slotManager.getSlotSize());
+    }
+
+    @Override
+    public void delete(int tupleIndex, MultiComparator cmp) {
+        int slotOff = slotManager.getSlotOff(tupleIndex);
+
+        int tupleOff = slotManager.getTupleOff(slotOff);
+        frameTuple.resetByTupleOffset(buf, tupleOff);
+        int tupleSize = tupleWriter.bytesRequired(frameTuple);
+
+        // perform deletion (we just do a memcpy to overwrite the slot)
+        int slotStartOff = slotManager.getSlotEndOff();
+        int length = slotOff - slotStartOff;
+        System.arraycopy(buf.array(), slotStartOff, buf.array(), slotStartOff + slotManager.getSlotSize(), length);
+
+        // maintain space information
+        buf.putInt(tupleCountOff, buf.getInt(tupleCountOff) - 1);
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + tupleSize + slotManager.getSlotSize());
+    }
+
+    @Override
+    public int getFieldCount() {
+        return frameTuple.getFieldCount();
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrameFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrameFactory.java
new file mode 100644
index 0000000..e31148f
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrameFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+
+public class RTreeNSMLeafFrameFactory implements ITreeIndexFrameFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final ITreeIndexTupleWriterFactory tupleWriterFactory;
+    private final IPrimitiveValueProviderFactory[] keyValueProviderFactories;
+
+    public RTreeNSMLeafFrameFactory(ITreeIndexTupleWriterFactory tupleWriterFactory, IPrimitiveValueProviderFactory[] keyValueProviderFactories) {
+        this.tupleWriterFactory = tupleWriterFactory;
+        if (keyValueProviderFactories.length % 2 != 0) {
+            throw new IllegalArgumentException("The key has different number of dimensions.");
+        }
+        this.keyValueProviderFactories = keyValueProviderFactories;
+    }
+
+    @Override
+    public IRTreeLeafFrame createFrame() {
+        IPrimitiveValueProvider[] keyValueProviders = new IPrimitiveValueProvider[keyValueProviderFactories.length];
+        for (int i = 0; i < keyValueProviders.length; i++) {
+            keyValueProviders[i] = keyValueProviderFactories[i].createPrimitiveValueProvider();
+        }
+        return new RTreeNSMLeafFrame(tupleWriterFactory.createTupleWriter(), keyValueProviders);
+    }
+
+	@Override
+	public ITreeIndexTupleWriterFactory getTupleWriterFactory() {
+		return tupleWriterFactory;
+	}
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/DoublePrimitiveValueProviderFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/DoublePrimitiveValueProviderFactory.java
new file mode 100644
index 0000000..a590ef5
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/DoublePrimitiveValueProviderFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+
+public class DoublePrimitiveValueProviderFactory implements
+		IPrimitiveValueProviderFactory {
+	private static final long serialVersionUID = 1L;
+
+	public static final DoublePrimitiveValueProviderFactory INSTANCE = new DoublePrimitiveValueProviderFactory();
+
+	private DoublePrimitiveValueProviderFactory() {
+	}
+
+	@Override
+	public IPrimitiveValueProvider createPrimitiveValueProvider() {
+		return new IPrimitiveValueProvider() {
+			@Override
+			public double getValue(byte[] bytes, int offset) {
+				return DoubleSerializerDeserializer.getDouble(bytes, offset);
+			}
+		};
+	}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/EntriesOrder.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/EntriesOrder.java
new file mode 100644
index 0000000..91fca6a
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/EntriesOrder.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+public enum EntriesOrder {
+	ASCENDING, DESCENDING
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/FloatPrimitiveValueProviderFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/FloatPrimitiveValueProviderFactory.java
new file mode 100644
index 0000000..605f73a
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/FloatPrimitiveValueProviderFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+
+public class FloatPrimitiveValueProviderFactory implements
+		IPrimitiveValueProviderFactory {
+	private static final long serialVersionUID = 1L;
+
+	public static final FloatPrimitiveValueProviderFactory INSTANCE = new FloatPrimitiveValueProviderFactory();
+
+	private FloatPrimitiveValueProviderFactory() {
+	}
+
+	@Override
+	public IPrimitiveValueProvider createPrimitiveValueProvider() {
+		return new IPrimitiveValueProvider() {
+			@Override
+			public double getValue(byte[] bytes, int offset) {
+				return FloatSerializerDeserializer.getFloat(bytes, offset);
+			}
+		};
+	}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/IntegerPrimitiveValueProviderFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/IntegerPrimitiveValueProviderFactory.java
new file mode 100644
index 0000000..d1457db
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/IntegerPrimitiveValueProviderFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+
+public class IntegerPrimitiveValueProviderFactory implements
+		IPrimitiveValueProviderFactory {
+	private static final long serialVersionUID = 1L;
+
+	public static final IntegerPrimitiveValueProviderFactory INSTANCE = new IntegerPrimitiveValueProviderFactory();
+
+	private IntegerPrimitiveValueProviderFactory() {
+	}
+
+	@Override
+	public IPrimitiveValueProvider createPrimitiveValueProvider() {
+		return new IPrimitiveValueProvider() {
+			@Override
+			public double getValue(byte[] bytes, int offset) {
+				return IntegerSerializerDeserializer.getInt(bytes, offset);
+			}
+		};
+	}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/PathList.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/PathList.java
new file mode 100644
index 0000000..911029d
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/PathList.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IntArrayList;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.LongArrayList;
+
+public class PathList {
+    private IntArrayList pageIds;
+    private LongArrayList pageLsns;
+    private IntArrayList pageIndexes;
+
+    public PathList(int initialCapacity, int growth) {
+        pageIds = new IntArrayList(initialCapacity, growth);
+        pageLsns = new LongArrayList(initialCapacity, growth);
+        pageIndexes = new IntArrayList(initialCapacity, growth);
+    }
+
+    public int size() {
+        return pageIds.size();
+    }
+
+    public int first() {
+        return pageIds.first();
+    }
+
+    public void add(int pageId, long pageLsn, int pageIndex) {
+        pageIds.add(pageId);
+        pageLsns.add(pageLsn);
+        pageIndexes.add(pageIndex);
+    }
+
+    public void addFirst(int pageId, long pageLsn, int pageIndex) {
+        pageIds.addFirst(pageId);
+        pageLsns.addFirst(pageLsn);
+        pageIndexes.addFirst(pageIndex);
+    }
+
+    public int getFirstPageId() {
+        return pageIds.getFirst();
+    }
+
+    public long getFirstPageLsn() {
+        return pageLsns.getFirst();
+    }
+
+    public int getFirstPageIndex() {
+        return pageIndexes.getFirst();
+    }
+
+    public int getLastPageId() {
+        return pageIds.getLast();
+    }
+
+    public long getLastPageLsn() {
+        return pageLsns.getLast();
+    }
+
+    public int getLastPageIndex() {
+        return pageIndexes.getLast();
+    }
+
+    public int getPageId(int i) {
+        return pageIds.get(i);
+    }
+
+    public long getPageLsn(int i) {
+        return pageLsns.get(i);
+    }
+
+    public int getPageIndex(int i) {
+        return pageIndexes.get(i);
+    }
+
+    public void setPageLsn(int i, long pageLsn) {
+        pageLsns.set(i, pageLsn);
+    }
+
+    public void moveFirst() {
+        pageIds.moveFirst();
+        pageLsns.moveFirst();
+        pageIndexes.moveFirst();
+    }
+
+    public void moveLast() {
+        pageIds.removeLast();
+        pageLsns.removeLast();
+        pageIndexes.removeLast();
+    }
+
+    public boolean isLast() {
+        return pageIds.isLast();
+    }
+
+    public void clear() {
+        pageIds.clear();
+        pageLsns.clear();
+        pageIndexes.clear();
+    }
+
+    public boolean isEmpty() {
+        return pageIds.isEmpty();
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
new file mode 100644
index 0000000..cc3cf5b
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -0,0 +1,996 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexType;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
+import edu.uci.ics.hyracks.storage.am.common.impls.TreeDiskOrderScanCursor;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.util.TreeIndexUtils;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrame;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+
+public class RTree implements ITreeIndex {
+
+    private final int rootPage = 1;
+
+    // Global node sequence number used for the concurrency control protocol
+    private final AtomicLong globalNsn;
+    private final ReadWriteLock treeLatch;
+
+    private final IFreePageManager freePageManager;
+    private final IBufferCache bufferCache;
+    private int fileId;
+
+    private final ITreeIndexFrameFactory interiorFrameFactory;
+    private final ITreeIndexFrameFactory leafFrameFactory;
+    private final int fieldCount;
+    private final IBinaryComparatorFactory[] cmpFactories;
+
+    public RTree(IBufferCache bufferCache, int fieldCount, IBinaryComparatorFactory[] cmpFactories,
+            IFreePageManager freePageManager, ITreeIndexFrameFactory interiorFrameFactory,
+            ITreeIndexFrameFactory leafFrameFactory) {
+        this.bufferCache = bufferCache;
+        this.fieldCount = fieldCount;
+        this.cmpFactories = cmpFactories;
+        this.freePageManager = freePageManager;
+        this.interiorFrameFactory = interiorFrameFactory;
+        this.leafFrameFactory = leafFrameFactory;
+        globalNsn = new AtomicLong();
+        this.treeLatch = new ReentrantReadWriteLock(true);
+    }
+
+    private long incrementGlobalNsn() {
+        return globalNsn.incrementAndGet();
+    }
+
+    public byte getTreeHeight(IRTreeLeafFrame leafFrame) throws HyracksDataException {
+        ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
+        rootNode.acquireReadLatch();
+        try {
+            leafFrame.setPage(rootNode);
+            return leafFrame.getLevel();
+        } finally {
+            rootNode.releaseReadLatch();
+            bufferCache.unpin(rootNode);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    public String printTree(IRTreeLeafFrame leafFrame, IRTreeInteriorFrame interiorFrame,
+            ISerializerDeserializer[] keySerdes) throws Exception {
+        MultiComparator cmp = MultiComparator.create(cmpFactories);
+        byte treeHeight = getTreeHeight(leafFrame);
+        StringBuilder strBuilder = new StringBuilder();
+        printTree(rootPage, null, false, leafFrame, interiorFrame, treeHeight, keySerdes, strBuilder, cmp);
+        return strBuilder.toString();
+    }
+
+    @SuppressWarnings("rawtypes")
+    public void printTree(int pageId, ICachedPage parent, boolean unpin, IRTreeLeafFrame leafFrame,
+            IRTreeInteriorFrame interiorFrame, byte treeHeight, ISerializerDeserializer[] keySerdes,
+            StringBuilder strBuilder, MultiComparator cmp) throws Exception {
+        ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+        node.acquireReadLatch();
+        try {
+            if (parent != null && unpin == true) {
+                parent.releaseReadLatch();
+                bufferCache.unpin(parent);
+            }
+            interiorFrame.setPage(node);
+            int level = interiorFrame.getLevel();
+            strBuilder.append(String.format("%1d ", level));
+            strBuilder.append(String.format("%3d ", pageId) + ": ");
+            for (int i = 0; i < treeHeight - level; i++) {
+                strBuilder.append("    ");
+            }
+
+            String keyString;
+            long LSN, NSN;
+            int rightPage;
+            if (interiorFrame.isLeaf()) {
+                leafFrame.setPage(node);
+                keyString = TreeIndexUtils.printFrameTuples(leafFrame, keySerdes);
+                LSN = leafFrame.getPageLsn();
+                NSN = leafFrame.getPageNsn();
+                rightPage = leafFrame.getRightPage();
+
+            } else {
+                keyString = TreeIndexUtils.printFrameTuples(interiorFrame, keySerdes);
+                LSN = interiorFrame.getPageLsn();
+                NSN = interiorFrame.getPageNsn();
+                rightPage = interiorFrame.getRightPage();
+            }
+
+            strBuilder.append(keyString + "\n" + "pageId: " + pageId + " LSN: " + LSN + " NSN: " + NSN + " rightPage: "
+                    + rightPage + "\n");
+            if (!interiorFrame.isLeaf()) {
+                ArrayList<Integer> children = ((RTreeNSMInteriorFrame) (interiorFrame)).getChildren(cmp);
+                for (int i = 0; i < children.size(); i++) {
+                    printTree(children.get(i), node, i == children.size() - 1, leafFrame, interiorFrame, treeHeight,
+                            keySerdes, strBuilder, cmp);
+                }
+            } else {
+                node.releaseReadLatch();
+                bufferCache.unpin(node);
+            }
+        } catch (Exception e) {
+            node.releaseReadLatch();
+            bufferCache.unpin(node);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void create(int fileId) throws HyracksDataException {
+        treeLatch.writeLock().lock();
+        try {
+            ITreeIndexFrame leafFrame = leafFrameFactory.createFrame();
+            ITreeIndexMetaDataFrame metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
+            freePageManager.open(fileId);
+            freePageManager.init(metaFrame, rootPage);
+
+            // initialize root page
+            ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
+
+            rootNode.acquireWriteLatch();
+            try {
+                leafFrame.setPage(rootNode);
+                leafFrame.initBuffer((byte) 0);
+            } finally {
+                rootNode.releaseWriteLatch();
+                bufferCache.unpin(rootNode);
+            }
+        } finally {
+            treeLatch.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void open(int fileId) {
+        this.fileId = fileId;
+        freePageManager.open(fileId);
+    }
+
+    @Override
+    public void close() {
+        fileId = -1;
+        freePageManager.close();
+    }
+
+    @Override
+    public int getFileId() {
+        return fileId;
+    }
+
+    @Override
+    public IBufferCache getBufferCache() {
+        return bufferCache;
+    }
+
+    private RTreeOpContext createOpContext() {
+        return new RTreeOpContext((IRTreeLeafFrame) leafFrameFactory.createFrame(),
+                (IRTreeInteriorFrame) interiorFrameFactory.createFrame(), freePageManager.getMetaDataFrameFactory()
+                        .createFrame(), cmpFactories, 8);
+    }
+
+    private void insert(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException, TreeIndexException {
+        RTreeOpContext ctx = (RTreeOpContext) ictx;
+        ctx.reset();
+        ctx.setTuple(tuple);
+        ctx.splitKey.reset();
+        ctx.splitKey.getLeftTuple().setFieldCount(cmpFactories.length);
+        ctx.splitKey.getRightTuple().setFieldCount(cmpFactories.length);
+
+        int maxFieldPos = cmpFactories.length / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = ctx.cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i), tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j));
+            if (c > 0) {
+                throw new IllegalArgumentException("The low key point has larger coordinates than the high key point.");
+            }
+        }
+        try {
+            ICachedPage leafNode = findLeaf(ctx);
+
+            int pageId = ctx.pathList.getLastPageId();
+            ctx.pathList.moveLast();
+            insertTuple(leafNode, pageId, ctx.getTuple(), ctx, true);
+
+            while (true) {
+                if (ctx.splitKey.getLeftPageBuffer() != null) {
+                    updateParentForInsert(ctx);
+                } else {
+                    break;
+                }
+            }
+        } finally {
+            for (int i = ctx.NSNUpdates.size() - 1; i >= 0; i--) {
+                ICachedPage node = ctx.NSNUpdates.get(i);
+                ctx.interiorFrame.setPage(node);
+                ctx.interiorFrame.setPageNsn(incrementGlobalNsn());
+            }
+
+            for (int i = ctx.LSNUpdates.size() - 1; i >= 0; i--) {
+                ICachedPage node = ctx.LSNUpdates.get(i);
+                ctx.interiorFrame.setPage(node);
+                ctx.interiorFrame.setPageLsn(incrementGlobalNsn());
+                node.releaseWriteLatch();
+                bufferCache.unpin(node);
+            }
+        }
+    }
+
+    private ICachedPage findLeaf(RTreeOpContext ctx) throws HyracksDataException {
+        int pageId = rootPage;
+        boolean writeLatched = false;
+        boolean readLatched = false;
+        boolean succeeded = false;
+        ICachedPage node = null;
+        boolean isLeaf = false;
+        long pageLsn = 0, parentLsn = 0;
+
+        try {
+
+            while (true) {
+                if (!writeLatched) {
+                    node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+                    ctx.interiorFrame.setPage(node);
+                    isLeaf = ctx.interiorFrame.isLeaf();
+                    if (isLeaf) {
+                        node.acquireWriteLatch();
+                        writeLatched = true;
+
+                        if (!ctx.interiorFrame.isLeaf()) {
+                            node.releaseWriteLatch();
+                            writeLatched = false;
+                            bufferCache.unpin(node);
+                            continue;
+                        }
+                    } else {
+                        // Be optimistic and grab read latch first. We will swap
+                        // it to write latch if we need to enlarge the best
+                        // child tuple.
+                        node.acquireReadLatch();
+                        readLatched = true;
+                    }
+                }
+
+                if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
+                    // Concurrent split detected, go back to parent and
+                    // re-choose the best child
+                    if (writeLatched) {
+                        node.releaseWriteLatch();
+                        writeLatched = false;
+                        bufferCache.unpin(node);
+                    } else {
+                        node.releaseReadLatch();
+                        readLatched = false;
+                        bufferCache.unpin(node);
+                    }
+
+                    pageId = ctx.pathList.getLastPageId();
+                    if (pageId != rootPage) {
+                        parentLsn = ctx.pathList.getPageLsn(ctx.pathList.size() - 2);
+                    }
+                    ctx.pathList.moveLast();
+                    continue;
+                }
+
+                pageLsn = ctx.interiorFrame.getPageLsn();
+                ctx.pathList.add(pageId, pageLsn, -1);
+
+                if (!isLeaf) {
+                    // findBestChild must be called *before* getBestChildPageId
+                    boolean enlarementIsNeeded = ctx.interiorFrame.findBestChild(ctx.getTuple(), ctx.cmp);
+                    int childPageId = ctx.interiorFrame.getBestChildPageId();
+
+                    if (enlarementIsNeeded) {
+                        if (!writeLatched) {
+                            node.releaseReadLatch();
+                            readLatched = false;
+                            bufferCache.unpin(node);
+
+                            node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+                            node.acquireWriteLatch();
+                            writeLatched = true;
+                            ctx.interiorFrame.setPage(node);
+
+                            if (ctx.interiorFrame.getPageLsn() != pageLsn) {
+                                // The page was changed while we unlocked it;
+                                // thus, retry (re-choose best child)
+
+                                ctx.pathList.moveLast();
+                                continue;
+                            }
+                        }
+                        // We don't need to reset the frameTuple because it is
+                        // already pointing to the best child
+                        ctx.interiorFrame.enlarge(ctx.getTuple(), ctx.cmp);
+
+                        node.releaseWriteLatch();
+                        writeLatched = false;
+                        bufferCache.unpin(node);
+                    } else {
+                        if (readLatched) {
+                            node.releaseReadLatch();
+                            readLatched = false;
+                            bufferCache.unpin(node);
+                        } else if (writeLatched) {
+                            node.releaseWriteLatch();
+                            writeLatched = false;
+                            bufferCache.unpin(node);
+                        }
+                    }
+
+                    pageId = childPageId;
+                    parentLsn = pageLsn;
+                } else {
+                    ctx.leafFrame.setPage(node);
+                    succeeded = true;
+                    return node;
+                }
+            }
+        } finally {
+            if (!succeeded) {
+                if (readLatched) {
+                    node.releaseReadLatch();
+                    readLatched = false;
+                    bufferCache.unpin(node);
+                } else if (writeLatched) {
+                    node.releaseWriteLatch();
+                    writeLatched = false;
+                    bufferCache.unpin(node);
+                }
+            }
+        }
+    }
+
+    private void insertTuple(ICachedPage node, int pageId, ITupleReference tuple, RTreeOpContext ctx, boolean isLeaf)
+            throws HyracksDataException, TreeIndexException {
+        boolean succeeded = false;
+        FrameOpSpaceStatus spaceStatus;
+        if (!isLeaf) {
+            spaceStatus = ctx.interiorFrame.hasSpaceInsert(tuple);
+        } else {
+            spaceStatus = ctx.leafFrame.hasSpaceInsert(tuple);
+        }
+
+        switch (spaceStatus) {
+            case SUFFICIENT_CONTIGUOUS_SPACE: {
+                try {
+                    if (!isLeaf) {
+                        ctx.interiorFrame.insert(tuple, -1);
+                    } else {
+                        ctx.leafFrame.insert(tuple, -1);
+                    }
+                    succeeded = true;
+                } finally {
+                    if (succeeded) {
+                        ctx.LSNUpdates.add(node);
+                        ctx.splitKey.reset();
+                    } else if (isLeaf) {
+                        // In case of a crash, we un-latch the interior node
+                        // inside updateParentForInsert.
+                        node.releaseWriteLatch();
+                        bufferCache.unpin(node);
+                    }
+                }
+                break;
+            }
+
+            case SUFFICIENT_SPACE: {
+                try {
+                    if (!isLeaf) {
+                        ctx.interiorFrame.compact();
+                        ctx.interiorFrame.insert(tuple, -1);
+                    } else {
+                        ctx.leafFrame.compact();
+                        ctx.leafFrame.insert(tuple, -1);
+                    }
+                    succeeded = true;
+                } finally {
+                    if (succeeded) {
+                        ctx.LSNUpdates.add(node);
+                        ctx.splitKey.reset();
+                    } else if (isLeaf) {
+                        // In case of a crash, we un-latch the interior node
+                        // inside updateParentForInsert.
+                        node.releaseWriteLatch();
+                        bufferCache.unpin(node);
+                    }
+                }
+                break;
+            }
+
+            case INSUFFICIENT_SPACE: {
+                int rightPageId = freePageManager.getFreePage(ctx.metaFrame);
+                ICachedPage rightNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rightPageId), true);
+                rightNode.acquireWriteLatch();
+
+                try {
+                    IRTreeFrame rightFrame;
+                    if (!isLeaf) {
+                        rightFrame = (IRTreeFrame) interiorFrameFactory.createFrame();
+                        rightFrame.setPage(rightNode);
+                        rightFrame.initBuffer((byte) ctx.interiorFrame.getLevel());
+                        rightFrame.setRightPage(ctx.interiorFrame.getRightPage());
+                        ctx.interiorFrame.split(rightFrame, tuple, ctx.splitKey);
+                        ctx.interiorFrame.setRightPage(rightPageId);
+                    } else {
+                        rightFrame = (IRTreeFrame) leafFrameFactory.createFrame();
+                        rightFrame.setPage(rightNode);
+                        rightFrame.initBuffer((byte) 0);
+                        rightFrame.setRightPage(ctx.interiorFrame.getRightPage());
+                        ctx.leafFrame.split(rightFrame, tuple, ctx.splitKey);
+                        ctx.leafFrame.setRightPage(rightPageId);
+                    }
+                    succeeded = true;
+                } finally {
+                    if (succeeded) {
+                        ctx.NSNUpdates.add(rightNode);
+                        ctx.LSNUpdates.add(rightNode);
+                        ctx.NSNUpdates.add(node);
+                        ctx.LSNUpdates.add(node);
+                    } else if (isLeaf) {
+                        // In case of a crash, we un-latch the interior node
+                        // inside updateParentForInsert.
+                        node.releaseWriteLatch();
+                        bufferCache.unpin(node);
+                        rightNode.releaseWriteLatch();
+                        bufferCache.unpin(rightNode);
+                    } else {
+                        rightNode.releaseWriteLatch();
+                        bufferCache.unpin(rightNode);
+                    }
+
+                }
+                ctx.splitKey.setPages(pageId, rightPageId);
+                if (pageId == rootPage) {
+                    int newLeftId = freePageManager.getFreePage(ctx.metaFrame);
+                    ICachedPage newLeftNode = bufferCache
+                            .pin(BufferedFileHandle.getDiskPageId(fileId, newLeftId), true);
+                    newLeftNode.acquireWriteLatch();
+                    succeeded = false;
+                    try {
+                        // copy left child to new left child
+                        System.arraycopy(node.getBuffer().array(), 0, newLeftNode.getBuffer().array(), 0, newLeftNode
+                                .getBuffer().capacity());
+
+                        // initialize new root (leftNode becomes new root)
+                        ctx.interiorFrame.setPage(node);
+                        ctx.interiorFrame.initBuffer((byte) (ctx.interiorFrame.getLevel() + 1));
+
+                        ctx.splitKey.setLeftPage(newLeftId);
+                        ctx.interiorFrame.insert(ctx.splitKey.getLeftTuple(), -1);
+                        ctx.interiorFrame.insert(ctx.splitKey.getRightTuple(), -1);
+
+                        succeeded = true;
+                    } finally {
+                        if (succeeded) {
+                            ctx.NSNUpdates.remove(ctx.NSNUpdates.size() - 1);
+                            ctx.LSNUpdates.remove(ctx.LSNUpdates.size() - 1);
+
+                            ctx.NSNUpdates.add(newLeftNode);
+                            ctx.LSNUpdates.add(newLeftNode);
+
+                            ctx.NSNUpdates.add(node);
+                            ctx.LSNUpdates.add(node);
+                            ctx.splitKey.reset();
+                        } else if (isLeaf) {
+                            // In case of a crash, we un-latch the interior node
+                            // inside updateParentForInsert.
+                            node.releaseWriteLatch();
+                            bufferCache.unpin(node);
+                            rightNode.releaseWriteLatch();
+                            bufferCache.unpin(rightNode);
+                            newLeftNode.releaseWriteLatch();
+                            bufferCache.unpin(newLeftNode);
+                        } else {
+                            rightNode.releaseWriteLatch();
+                            bufferCache.unpin(rightNode);
+                            newLeftNode.releaseWriteLatch();
+                            bufferCache.unpin(newLeftNode);
+                        }
+                    }
+                }
+                break;
+            }
+        }
+    }
+
+    private void updateParentForInsert(RTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
+        boolean succeeded = false;
+        boolean writeLatched = false;
+        int parentId = ctx.pathList.getLastPageId();
+        ICachedPage parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
+        parentNode.acquireWriteLatch();
+        writeLatched = true;
+        ctx.interiorFrame.setPage(parentNode);
+        boolean foundParent = true;
+
+        try {
+            if (ctx.interiorFrame.getPageLsn() != ctx.pathList.getLastPageLsn()) {
+                foundParent = false;
+                while (true) {
+                    if (ctx.interiorFrame.findTupleByPointer(ctx.splitKey.getLeftTuple(), ctx.cmp) != -1) {
+                        // found the parent
+                        foundParent = true;
+                        break;
+                    }
+                    int rightPage = ctx.interiorFrame.getRightPage();
+                    parentNode.releaseWriteLatch();
+                    writeLatched = false;
+                    bufferCache.unpin(parentNode);
+
+                    if (rightPage == -1) {
+                        break;
+                    }
+
+                    parentId = rightPage;
+                    parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
+                    parentNode.acquireWriteLatch();
+                    writeLatched = true;
+                    ctx.interiorFrame.setPage(parentNode);
+                }
+            }
+
+            if (foundParent) {
+                try {
+                    ctx.interiorFrame.adjustKey(ctx.splitKey.getLeftTuple(), -1, ctx.cmp);
+                } catch (TreeIndexException e) {
+                    if (writeLatched) {
+                        parentNode.releaseWriteLatch();
+                        writeLatched = false;
+                        bufferCache.unpin(parentNode);
+                    }
+                    throw e;
+                }
+                insertTuple(parentNode, parentId, ctx.splitKey.getRightTuple(), ctx, ctx.interiorFrame.isLeaf());
+                ctx.pathList.moveLast();
+                succeeded = true;
+                return;
+
+            }
+        } finally {
+            if (!succeeded) {
+                if (writeLatched) {
+                    parentNode.releaseWriteLatch();
+                    writeLatched = false;
+                    bufferCache.unpin(parentNode);
+                }
+            }
+        }
+
+        ctx.traverseList.clear();
+        findPath(ctx);
+        updateParentForInsert(ctx);
+    }
+
+    private void findPath(RTreeOpContext ctx) throws TreeIndexException, HyracksDataException {
+        boolean readLatched = false;
+        int pageId = rootPage;
+        int parentIndex = -1;
+        long parentLsn = 0;
+        long pageLsn;
+        int pageIndex;
+        ICachedPage node = null;
+        ctx.traverseList.add(pageId, -1, parentIndex);
+        try {
+            while (!ctx.traverseList.isLast()) {
+                pageId = ctx.traverseList.getFirstPageId();
+                parentIndex = ctx.traverseList.getFirstPageIndex();
+
+                node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+                node.acquireReadLatch();
+                readLatched = true;
+                ctx.interiorFrame.setPage(node);
+                pageLsn = ctx.interiorFrame.getPageLsn();
+                pageIndex = ctx.traverseList.first();
+                ctx.traverseList.setPageLsn(pageIndex, pageLsn);
+
+                ctx.traverseList.moveFirst();
+
+                if (ctx.interiorFrame.isLeaf()) {
+                    throw new TreeIndexException("Error: Failed to re-find parent of a page in the tree.");
+                }
+
+                if (pageId != rootPage) {
+                    parentLsn = ctx.traverseList.getPageLsn(ctx.traverseList.getPageIndex(pageIndex));
+                }
+                if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
+                    int rightPage = ctx.interiorFrame.getRightPage();
+                    if (rightPage != -1) {
+                        ctx.traverseList.addFirst(rightPage, -1, parentIndex);
+                    }
+                }
+
+                if (ctx.interiorFrame.findTupleByPointer(ctx.splitKey.getLeftTuple(), ctx.traverseList, pageIndex,
+                        ctx.cmp) != -1) {
+                    ctx.pathList.clear();
+                    fillPath(ctx, pageIndex);
+                    return;
+                }
+                node.releaseReadLatch();
+                readLatched = false;
+                bufferCache.unpin(node);
+            }
+        } finally {
+            if (readLatched) {
+                node.releaseReadLatch();
+                readLatched = false;
+                bufferCache.unpin(node);
+            }
+        }
+    }
+
+    private void fillPath(RTreeOpContext ctx, int pageIndex) {
+        if (pageIndex != -1) {
+            fillPath(ctx, ctx.traverseList.getPageIndex(pageIndex));
+            ctx.pathList.add(ctx.traverseList.getPageId(pageIndex), ctx.traverseList.getPageLsn(pageIndex), -1);
+        }
+    }
+
+    private void delete(ITupleReference tuple, RTreeOpContext ctx) throws HyracksDataException, TreeIndexException {
+        ctx.reset();
+        ctx.setTuple(tuple);
+        ctx.splitKey.reset();
+        ctx.splitKey.getLeftTuple().setFieldCount(cmpFactories.length);
+
+        // We delete the first matching tuple (including the payload data.
+        // We don't update the MBRs of the parents after deleting the record.
+        int tupleIndex = findTupleToDelete(ctx);
+
+        if (tupleIndex != -1) {
+            try {
+                deleteTuple(tupleIndex, ctx);
+            } finally {
+                ctx.leafFrame.getPage().releaseWriteLatch();
+                bufferCache.unpin(ctx.leafFrame.getPage());
+            }
+        }
+    }
+
+    private int findTupleToDelete(RTreeOpContext ctx) throws HyracksDataException {
+        boolean writeLatched = false;
+        boolean readLatched = false;
+        boolean succeeded = false;
+        ICachedPage node = null;
+        ctx.pathList.add(rootPage, -1, -1);
+
+        try {
+            while (!ctx.pathList.isEmpty()) {
+                int pageId = ctx.pathList.getLastPageId();
+                long parentLsn = ctx.pathList.getLastPageLsn();
+                ctx.pathList.moveLast();
+                node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+                node.acquireReadLatch();
+                readLatched = true;
+                ctx.interiorFrame.setPage(node);
+                boolean isLeaf = ctx.interiorFrame.isLeaf();
+                long pageLsn = ctx.interiorFrame.getPageLsn();
+
+                if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
+                    // Concurrent split detected, we need to visit the right
+                    // page
+                    int rightPage = ctx.interiorFrame.getRightPage();
+                    if (rightPage != -1) {
+                        ctx.pathList.add(rightPage, parentLsn, -1);
+                    }
+                }
+
+                if (!isLeaf) {
+                    for (int i = 0; i < ctx.interiorFrame.getTupleCount(); i++) {
+                        int childPageId = ctx.interiorFrame.getChildPageIdIfIntersect(ctx.tuple, i, ctx.cmp);
+                        if (childPageId != -1) {
+                            ctx.pathList.add(childPageId, pageLsn, -1);
+                        }
+                    }
+                } else {
+                    ctx.leafFrame.setPage(node);
+                    int tupleIndex = ctx.leafFrame.findTupleIndex(ctx.tuple, ctx.cmp);
+                    if (tupleIndex != -1) {
+
+                        node.releaseReadLatch();
+                        readLatched = false;
+                        bufferCache.unpin(node);
+
+                        node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+                        node.acquireWriteLatch();
+                        writeLatched = true;
+                        ctx.leafFrame.setPage(node);
+
+                        // A rare case only happen when a root is no longer a
+                        // leaf page. Simply we restart the search.
+                        if (!ctx.leafFrame.isLeaf()) {
+                            ctx.pathList.add(pageId, -1, -1);
+
+                            node.releaseWriteLatch();
+                            writeLatched = false;
+                            bufferCache.unpin(node);
+                            continue;
+                        }
+
+                        if (ctx.leafFrame.getPageLsn() != pageLsn) {
+                            // The page was changed while we unlocked it
+
+                            tupleIndex = ctx.leafFrame.findTupleIndex(ctx.tuple, ctx.cmp);
+                            if (tupleIndex == -1) {
+                                ctx.pathList.add(pageId, parentLsn, -1);
+
+                                node.releaseWriteLatch();
+                                writeLatched = false;
+                                bufferCache.unpin(node);
+                                continue;
+                            } else {
+                                succeeded = true;
+                                return tupleIndex;
+                            }
+                        } else {
+                            succeeded = true;
+                            return tupleIndex;
+                        }
+                    }
+                }
+                node.releaseReadLatch();
+                readLatched = false;
+                bufferCache.unpin(node);
+            }
+        } finally {
+            if (!succeeded) {
+                if (readLatched) {
+                    node.releaseReadLatch();
+                    readLatched = false;
+                    bufferCache.unpin(node);
+                } else if (writeLatched) {
+                    node.releaseWriteLatch();
+                    writeLatched = false;
+                    bufferCache.unpin(node);
+                }
+            }
+        }
+        return -1;
+    }
+
+    private void deleteTuple(int tupleIndex, RTreeOpContext ctx) throws HyracksDataException {
+        ctx.leafFrame.delete(tupleIndex, ctx.cmp);
+        ctx.leafFrame.setPageLsn(incrementGlobalNsn());
+    }
+
+    private void search(ITreeIndexCursor cursor, ISearchPredicate searchPred, RTreeOpContext ctx)
+            throws HyracksDataException, TreeIndexException {
+        ctx.reset();
+        ctx.cursor = cursor;
+
+        cursor.setBufferCache(bufferCache);
+        cursor.setFileId(fileId);
+        ctx.cursorInitialState.setRootPage(rootPage);
+        ctx.cursor.open(ctx.cursorInitialState, (SearchPredicate) searchPred);
+    }
+
+    @Override
+    public ITreeIndexFrameFactory getInteriorFrameFactory() {
+        return interiorFrameFactory;
+    }
+
+    @Override
+    public ITreeIndexFrameFactory getLeafFrameFactory() {
+        return leafFrameFactory;
+    }
+
+    @Override
+    public IBinaryComparatorFactory[] getComparatorFactories() {
+        return cmpFactories;
+    }
+
+    @Override
+    public IFreePageManager getFreePageManager() {
+        return freePageManager;
+    }
+
+    private void update(ITupleReference tuple, RTreeOpContext ctx) {
+        throw new UnsupportedOperationException("RTree Update not implemented.");
+    }
+
+    public boolean isEmptyTree(IRTreeLeafFrame leafFrame) throws HyracksDataException {
+        ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
+        rootNode.acquireReadLatch();
+        try {
+            leafFrame.setPage(rootNode);
+            if (leafFrame.getLevel() == 0 && leafFrame.getTupleCount() == 0) {
+                return true;
+            } else {
+                return false;
+            }
+        } finally {
+            rootNode.releaseReadLatch();
+            bufferCache.unpin(rootNode);
+        }
+    }
+
+    public final class BulkLoadContext implements IIndexBulkLoadContext {
+
+        public ITreeIndexAccessor indexAccessor;
+
+        public BulkLoadContext(float fillFactor, IRTreeFrame leafFrame, IRTreeFrame interiorFrame,
+                ITreeIndexMetaDataFrame metaFrame) throws HyracksDataException {
+            indexAccessor = createAccessor();
+        }
+    }
+
+    @Override
+    public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws HyracksDataException {
+        IRTreeLeafFrame leafFrame = (IRTreeLeafFrame) leafFrameFactory.createFrame();
+        if (!isEmptyTree(leafFrame)) {
+            throw new HyracksDataException("Trying to Bulk-load a non-empty RTree.");
+        }
+
+        BulkLoadContext ctx = new BulkLoadContext(fillFactor, (IRTreeFrame) leafFrameFactory.createFrame(),
+                (IRTreeFrame) interiorFrameFactory.createFrame(), freePageManager.getMetaDataFrameFactory()
+                        .createFrame());
+        return ctx;
+    }
+
+    @Override
+    public void bulkLoadAddTuple(ITupleReference tuple, IIndexBulkLoadContext ictx) throws HyracksDataException {
+        try {
+            ((BulkLoadContext) ictx).indexAccessor.insert(tuple);
+        } catch (Exception e) {
+            throw new HyracksDataException("BulkLoad Error", e);
+        }
+    }
+
+    @Override
+    public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
+    }
+
+    private void diskOrderScan(ITreeIndexCursor icursor, RTreeOpContext ctx) throws HyracksDataException {
+        TreeDiskOrderScanCursor cursor = (TreeDiskOrderScanCursor) icursor;
+        ctx.reset();
+
+        MultiComparator cmp = MultiComparator.create(cmpFactories);
+        SearchPredicate searchPred = new SearchPredicate(null, cmp);
+
+        int currentPageId = rootPage + 1;
+        int maxPageId = freePageManager.getMaxPage(ctx.metaFrame);
+
+        ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false);
+        page.acquireReadLatch();
+        try {
+            cursor.setBufferCache(bufferCache);
+            cursor.setFileId(fileId);
+            cursor.setCurrentPageId(currentPageId);
+            cursor.setMaxPageId(maxPageId);
+            ctx.cursorInitialState.setPage(page);
+            cursor.open(ctx.cursorInitialState, searchPred);
+        } catch (Exception e) {
+            page.releaseReadLatch();
+            bufferCache.unpin(page);
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public int getRootPageId() {
+        return rootPage;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return fieldCount;
+    }
+
+    @Override
+    public IndexType getIndexType() {
+        return IndexType.RTREE;
+    }
+
+    @Override
+    public ITreeIndexAccessor createAccessor() {
+        return new RTreeAccessor(this);
+    }
+
+    public class RTreeAccessor implements ITreeIndexAccessor {
+        private RTree rtree;
+        private RTreeOpContext ctx;
+
+        public RTreeAccessor(RTree rtree) {
+            this.rtree = rtree;
+            this.ctx = rtree.createOpContext();
+        }
+
+        @Override
+        public void insert(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
+            ctx.reset(IndexOp.INSERT);
+            rtree.insert(tuple, ctx);
+        }
+
+        @Override
+        public void update(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
+            ctx.reset(IndexOp.UPDATE);
+            rtree.update(tuple, ctx);
+        }
+
+        @Override
+        public void delete(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
+            ctx.reset(IndexOp.DELETE);
+            rtree.delete(tuple, ctx);
+        }
+
+        @Override
+        public ITreeIndexCursor createSearchCursor() {
+            return new RTreeSearchCursor((IRTreeInteriorFrame) interiorFrameFactory.createFrame(),
+                    (IRTreeLeafFrame) leafFrameFactory.createFrame());
+        }
+
+        @Override
+        public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException,
+                IndexException {
+            ctx.reset(IndexOp.SEARCH);
+            rtree.search((ITreeIndexCursor) cursor, searchPred, ctx);
+        }
+
+        @Override
+        public ITreeIndexCursor createDiskOrderScanCursor() {
+            return new TreeDiskOrderScanCursor(leafFrameFactory.createFrame());
+        }
+
+        @Override
+        public void diskOrderScan(ITreeIndexCursor cursor) throws HyracksDataException {
+            ctx.reset(IndexOp.DISKORDERSCAN);
+            rtree.diskOrderScan(cursor, ctx);
+        }
+
+        public RTreeOpContext getOpContext() {
+            return ctx;
+        }
+
+        @Override
+        public void upsert(ITupleReference tuple) throws HyracksDataException, TreeIndexException {
+            throw new UnsupportedOperationException(
+                    "The RTree does not suypport the notion of keys, therefore upsert does not make sense.");
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeCursorInitialState.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeCursorInitialState.java
new file mode 100644
index 0000000..ac1eb7d
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeCursorInitialState.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public class RTreeCursorInitialState implements ICursorInitialState {
+
+	private PathList pathList;
+	private int rootPage;
+	private ICachedPage page; // for disk order scan
+
+	public RTreeCursorInitialState(PathList pathList, int rootPage) {
+		this.pathList = pathList;
+		this.rootPage = rootPage;
+	}
+
+	public PathList getPathList() {
+		return pathList;
+	}
+
+	public int getRootPage() {
+		return rootPage;
+	}
+
+	public void setRootPage(int rootPage) {
+		this.rootPage = rootPage;
+	}
+
+	public ICachedPage getPage() {
+		return page;
+	}
+
+	public void setPage(ICachedPage page) {
+		this.page = page;
+	}
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
new file mode 100644
index 0000000..6683444
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public class RTreeOpContext implements IIndexOpContext {
+    private static final int INITIAL_TRAVERSE_LIST_SIZE = 100;
+    public final MultiComparator cmp;
+    public final IRTreeInteriorFrame interiorFrame;
+    public final IRTreeLeafFrame leafFrame;
+    public IndexOp op;
+    public ITreeIndexCursor cursor;
+    public RTreeCursorInitialState cursorInitialState;
+    public ITreeIndexMetaDataFrame metaFrame;
+    public RTreeSplitKey splitKey;
+    public ITupleReference tuple;
+    // Used to record the pageIds and pageLsns of the visited pages.
+    public PathList pathList;
+    // Used for traversing the tree.
+    public PathList traverseList;
+
+    public ArrayList<ICachedPage> NSNUpdates;
+    public ArrayList<ICachedPage> LSNUpdates;
+
+    public RTreeOpContext(IRTreeLeafFrame leafFrame, IRTreeInteriorFrame interiorFrame,
+            ITreeIndexMetaDataFrame metaFrame, IBinaryComparatorFactory[] cmpFactories, int treeHeightHint) {
+        this.cmp = MultiComparator.create(cmpFactories);
+        this.interiorFrame = interiorFrame;
+        this.leafFrame = leafFrame;
+        this.metaFrame = metaFrame;
+        pathList = new PathList(treeHeightHint, treeHeightHint);
+        NSNUpdates = new ArrayList<ICachedPage>();
+        LSNUpdates = new ArrayList<ICachedPage>();
+    }
+
+    public ITupleReference getTuple() {
+        return tuple;
+    }
+
+    public void setTuple(ITupleReference tuple) {
+        this.tuple = tuple;
+    }
+
+    public void reset() {
+        if (pathList != null) {
+            pathList.clear();
+        }
+        if (traverseList != null) {
+            traverseList.clear();
+        }
+        NSNUpdates.clear();
+        LSNUpdates.clear();
+    }
+
+    @Override
+    public void reset(IndexOp newOp) {
+        if (op != null && newOp == op) {
+            return;
+        }
+        if (op != IndexOp.SEARCH && op != IndexOp.DISKORDERSCAN) {
+            if (splitKey == null) {
+                splitKey = new RTreeSplitKey(interiorFrame.getTupleWriter().createTupleReference(), interiorFrame
+                        .getTupleWriter().createTupleReference());
+            }
+            if (traverseList == null) {
+                traverseList = new PathList(INITIAL_TRAVERSE_LIST_SIZE, INITIAL_TRAVERSE_LIST_SIZE);
+            }
+        }
+        if (cursorInitialState == null) {
+            cursorInitialState = new RTreeCursorInitialState(pathList, 1);
+        }
+        this.op = newOp;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
new file mode 100644
index 0000000..ee7ec5f
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+
+public class RTreeSearchCursor implements ITreeIndexCursor {
+
+    private int fileId = -1;
+    private ICachedPage page = null;
+    private IRTreeInteriorFrame interiorFrame = null;
+    private IRTreeLeafFrame leafFrame = null;
+    private IBufferCache bufferCache = null;
+
+    private SearchPredicate pred;
+    private PathList pathList;
+    private int rootPage;
+    private ITupleReference searchKey;
+
+    private int tupleIndex = 0;
+    private int tupleIndexInc = 0;
+    private int currentTupleIndex = 0;
+    private int pageId = -1;
+
+    private MultiComparator cmp;
+
+    private ITreeIndexTupleReference frameTuple;
+    private boolean readLatched = false;
+
+    public RTreeSearchCursor(IRTreeInteriorFrame interiorFrame, IRTreeLeafFrame leafFrame) {
+        this.interiorFrame = interiorFrame;
+        this.leafFrame = leafFrame;
+        this.frameTuple = leafFrame.createTupleReference();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (readLatched) {
+            page.releaseReadLatch();
+            bufferCache.unpin(page);
+            readLatched = false;
+        }
+        tupleIndex = 0;
+        tupleIndexInc = 0;
+        page = null;
+        pathList = null;
+    }
+
+    public ITupleReference getTuple() {
+        return frameTuple;
+    }
+
+    public int getTupleOffset() {
+        return leafFrame.getTupleOffset(currentTupleIndex);
+    }
+
+    public int getPageId() {
+        return pageId;
+    }
+
+    @Override
+    public ICachedPage getPage() {
+        return page;
+    }
+
+    private boolean fetchNextLeafPage() throws HyracksDataException {
+        boolean succeeded = false;
+        if (readLatched) {
+            page.releaseReadLatch();
+            bufferCache.unpin(page);
+            readLatched = false;
+        }
+
+        while (!pathList.isEmpty()) {
+            int pageId = pathList.getLastPageId();
+            long parentLsn = pathList.getLastPageLsn();
+            pathList.moveLast();
+            ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+            node.acquireReadLatch();
+            readLatched = true;
+            try {
+                interiorFrame.setPage(node);
+                boolean isLeaf = interiorFrame.isLeaf();
+                long pageLsn = interiorFrame.getPageLsn();
+
+                if (pageId != rootPage && parentLsn < interiorFrame.getPageNsn()) {
+                    // Concurrent split detected, we need to visit the right
+                    // page
+                    int rightPage = interiorFrame.getRightPage();
+                    if (rightPage != -1) {
+                        pathList.add(rightPage, parentLsn, -1);
+                    }
+                }
+
+                if (!isLeaf) {
+                    if (searchKey != null) {
+                        for (int i = 0; i < interiorFrame.getTupleCount(); i++) {
+                            int childPageId = interiorFrame.getChildPageIdIfIntersect(searchKey, i, cmp);
+                            if (childPageId != -1) {
+                                pathList.add(childPageId, pageLsn, -1);
+                            }
+                        }
+                    } else {
+                        for (int i = 0; i < interiorFrame.getTupleCount(); i++) {
+                            int childPageId = interiorFrame.getChildPageId(i);
+                            pathList.add(childPageId, pageLsn, -1);
+                        }
+                    }
+
+                } else {
+                    page = node;
+                    this.pageId = pageId; // This is only needed for the
+                                          // LSMRTree flush operation
+                    leafFrame.setPage(page);
+                    tupleIndex = 0;
+                    succeeded = true;
+                    return true;
+                }
+            } finally {
+                if (!succeeded) {
+                    if (readLatched) {
+                        node.releaseReadLatch();
+                        readLatched = false;
+                        bufferCache.unpin(node);
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean hasNext() throws HyracksDataException {
+        if (page == null) {
+            return false;
+        }
+
+        if (tupleIndex == leafFrame.getTupleCount()) {
+            if (!fetchNextLeafPage()) {
+                return false;
+            }
+        }
+
+        do {
+            for (int i = tupleIndex; i < leafFrame.getTupleCount(); i++) {
+                if (searchKey != null) {
+                    if (leafFrame.intersect(searchKey, i, cmp)) {
+                        frameTuple.resetByTupleIndex(leafFrame, i);
+                        currentTupleIndex = i; // This is only needed for the
+                                               // LSMRTree flush operation
+                        tupleIndexInc = i + 1;
+                        return true;
+                    }
+                } else {
+                    frameTuple.resetByTupleIndex(leafFrame, i);
+                    currentTupleIndex = i; // This is only needed for the
+                                           // LSMRTree
+                                           // flush operation
+                    tupleIndexInc = i + 1;
+                    return true;
+                }
+            }
+        } while (fetchNextLeafPage());
+        return false;
+    }
+
+    @Override
+    public void next() throws HyracksDataException {
+        tupleIndex = tupleIndexInc;
+    }
+
+    @Override
+    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+        // in case open is called multiple times without closing
+        if (this.page != null) {
+            this.page.releaseReadLatch();
+            readLatched = false;
+            bufferCache.unpin(this.page);
+            pathList.clear();
+        }
+
+        pathList = ((RTreeCursorInitialState) initialState).getPathList();
+        rootPage = ((RTreeCursorInitialState) initialState).getRootPage();
+
+        pred = (SearchPredicate) searchPred;
+        cmp = pred.getLowKeyComparator();
+        searchKey = pred.getSearchKey();
+
+        if (searchKey != null) {
+            int maxFieldPos = cmp.getKeyFieldCount() / 2;
+            for (int i = 0; i < maxFieldPos; i++) {
+                int j = maxFieldPos + i;
+                int c = cmp.getComparators()[i].compare(searchKey.getFieldData(i), searchKey.getFieldStart(i),
+                        searchKey.getFieldLength(i), searchKey.getFieldData(j), searchKey.getFieldStart(j),
+                        searchKey.getFieldLength(j));
+                if (c > 0) {
+                    throw new IllegalArgumentException(
+                            "The low key point has larger coordinates than the high key point.");
+                }
+            }
+        }
+
+        pathList.add(this.rootPage, -1, -1);
+        tupleIndex = 0;
+        fetchNextLeafPage();
+    }
+
+    @Override
+    public void reset() {
+        try {
+            close();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void setBufferCache(IBufferCache bufferCache) {
+        this.bufferCache = bufferCache;
+    }
+
+    @Override
+    public void setFileId(int fileId) {
+        this.fileId = fileId;
+    }
+
+    @Override
+    public boolean exclusiveLatchNodes() {
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSplitKey.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSplitKey.java
new file mode 100644
index 0000000..ecbcd38
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSplitKey.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+
+public class RTreeSplitKey implements ISplitKey {
+	public byte[] leftPageData = null;
+	public ByteBuffer leftPageBuf = null;
+	public ITreeIndexTupleReference leftTuple;
+
+	public byte[] rightPageData = null;
+	public ByteBuffer rightPageBuf = null;
+	public ITreeIndexTupleReference rightTuple;
+
+	public int keySize = 0;
+
+	public RTreeSplitKey(ITreeIndexTupleReference leftTuple,
+			ITreeIndexTupleReference rightTuple) {
+		this.leftTuple = leftTuple;
+		this.rightTuple = rightTuple;
+	}
+
+	public void initData(int keySize) {
+		// try to reuse existing memory from a lower-level split if possible
+		this.keySize = keySize;
+		if (leftPageData != null) {
+			if (leftPageData.length < keySize + 4) {
+				leftPageData = new byte[keySize + 4]; // add 4 for the page
+				leftPageBuf = ByteBuffer.wrap(leftPageData);
+			}
+		} else {
+			leftPageData = new byte[keySize + 4]; // add 4 for the page
+			leftPageBuf = ByteBuffer.wrap(leftPageData);
+		}
+		if (rightPageData != null) {
+			if (rightPageData.length < keySize + 4) {
+				rightPageData = new byte[keySize + 4]; // add 4 for the page
+				rightPageBuf = ByteBuffer.wrap(rightPageData);
+			}
+		} else {
+			rightPageData = new byte[keySize + 4]; // add 4 for the page
+			rightPageBuf = ByteBuffer.wrap(rightPageData);
+		}
+
+		leftTuple.resetByTupleOffset(leftPageBuf, 0);
+		rightTuple.resetByTupleOffset(rightPageBuf, 0);
+	}
+
+	public void resetLeftPage() {
+		leftPageData = null;
+		leftPageBuf = null;
+	}
+
+	public void resetRightPage() {
+		rightPageData = null;
+		rightPageBuf = null;
+	}
+
+	public ByteBuffer getLeftPageBuffer() {
+		return leftPageBuf;
+	}
+
+	public ByteBuffer getRightPageBuffer() {
+		return rightPageBuf;
+	}
+
+	public ITreeIndexTupleReference getLeftTuple() {
+		return leftTuple;
+	}
+
+	public ITreeIndexTupleReference getRightTuple() {
+		return rightTuple;
+	}
+
+	public int getLeftPage() {
+		return leftPageBuf.getInt(keySize);
+	}
+
+	public int getRightPage() {
+		return rightPageBuf.getInt(keySize);
+	}
+
+	public void setLeftPage(int page) {
+		leftPageBuf.putInt(keySize, page);
+	}
+
+	public void setRightPage(int page) {
+		rightPageBuf.putInt(keySize, page);
+	}
+
+	public ISplitKey duplicate(ITreeIndexTupleReference copyLeftTuple,
+			ITreeIndexTupleReference copyRightTuple) {
+		RTreeSplitKey copy = new RTreeSplitKey(copyLeftTuple, copyRightTuple);
+		copy.leftPageData = leftPageData.clone();
+		copy.leftPageBuf = ByteBuffer.wrap(copy.leftPageData);
+		copy.leftTuple.setFieldCount(leftTuple.getFieldCount());
+		copy.leftTuple.resetByTupleOffset(copy.leftPageBuf, 0);
+
+		copy.rightPageData = rightPageData.clone();
+		copy.rightPageBuf = ByteBuffer.wrap(copy.rightPageData);
+		copy.rightTuple.setFieldCount(rightTuple.getFieldCount());
+		copy.rightTuple.resetByTupleOffset(copy.rightPageBuf, 0);
+		return copy;
+	}
+
+	@Override
+	public void reset() {
+		leftPageData = null;
+		leftPageBuf = null;
+		rightPageData = null;
+		rightPageBuf = null;
+	}
+
+	@Override
+	public ByteBuffer getBuffer() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public ITreeIndexTupleReference getTuple() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void setPages(int leftPage, int rightPage) {
+		leftPageBuf.putInt(keySize, leftPage);
+		rightPageBuf.putInt(keySize, rightPage);
+	}
+
+	@Override
+	public ISplitKey duplicate(ITreeIndexTupleReference copyTuple) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java
new file mode 100644
index 0000000..cb9b160
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+
+public class Rectangle {
+	private int dim;
+	private double[] low;
+	private double[] high;
+
+	public Rectangle(int dim) {
+		this.dim = dim;
+		low = new double[this.dim];
+		high = new double[this.dim];
+	}
+
+	public int getDim() {
+		return dim;
+	}
+
+	public double getLow(int i) {
+		return low[i];
+	}
+
+	public double getHigh(int i) {
+		return high[i];
+	}
+
+	public void setLow(int i, double value) {
+		low[i] = value;
+	}
+
+	public void setHigh(int i, double value) {
+		high[i] = value;
+	}
+
+	public void set(ITupleReference tuple, IPrimitiveValueProvider[] valueProviders) {
+		for (int i = 0; i < getDim(); i++) {
+			int j = i + getDim();
+			setLow(i, valueProviders[i].getValue(
+					tuple.getFieldData(i), tuple.getFieldStart(i)));
+			setHigh(i, valueProviders[j].getValue(
+					tuple.getFieldData(j), tuple.getFieldStart(j)));
+		}
+	}
+
+	public void enlarge(ITupleReference tupleToBeInserted, IPrimitiveValueProvider[] valueProviders) {
+		for (int i = 0; i < getDim(); i++) {
+			int j = getDim() + i;
+			double low = valueProviders[i].getValue(
+					tupleToBeInserted.getFieldData(i),
+					tupleToBeInserted.getFieldStart(i));
+			if (getLow(i) > low) {
+				setLow(i, low);
+			}
+			double high = valueProviders[j].getValue(
+					tupleToBeInserted.getFieldData(j),
+					tupleToBeInserted.getFieldStart(j));
+			if (getHigh(i) < high) {
+				setHigh(i, high);
+			}
+		}
+	}
+
+	public double margin() {
+		double margin = 0.0;
+		double mul = Math.pow(2, (double) getDim() - 1.0);
+		for (int i = 0; i < getDim(); i++) {
+			margin += (getHigh(i) - getLow(i)) * mul;
+		}
+		return margin;
+	}
+
+	public double overlappedArea(Rectangle rec) {
+		double area = 1.0;
+		double f1, f2;
+
+		for (int i = 0; i < getDim(); i++) {
+			if (getLow(i) > rec.getHigh(i) || getHigh(i) < rec.getLow(i)) {
+				return 0.0;
+			}
+
+			f1 = Math.max(getLow(i), rec.getLow(i));
+			f2 = Math.min(getHigh(i), rec.getHigh(i));
+			area *= f2 - f1;
+		}
+		return area;
+	}
+
+	public double area() {
+		double area = 1.0;
+		for (int i = 0; i < getDim(); i++) {
+			area *= getHigh(i) - getLow(i);
+		}
+		return area;
+	}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/SearchPredicate.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/SearchPredicate.java
new file mode 100644
index 0000000..c8d5816
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/SearchPredicate.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public class SearchPredicate implements ISearchPredicate {
+
+	private static final long serialVersionUID = 1L;
+
+	protected ITupleReference searchKey;
+	protected MultiComparator cmp;
+
+	public SearchPredicate(ITupleReference searchKey, MultiComparator cmp) {
+		this.searchKey = searchKey;
+		this.cmp = cmp;
+	}
+
+	public ITupleReference getSearchKey() {
+		return searchKey;
+	}
+
+	public void setSearchKey(ITupleReference searchKey) {
+		this.searchKey = searchKey;
+	}
+
+	public MultiComparator getLowKeyComparator() {
+		return cmp;
+	}
+
+	public MultiComparator getHighKeyComparator() {
+		return cmp;
+	}
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntry.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntry.java
new file mode 100644
index 0000000..6cb84a4
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntry.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMFrame;
+
+public class TupleEntry implements Comparable<TupleEntry> {
+	private int tupleIndex;
+	private double value;
+
+	public TupleEntry() {
+	}
+
+	public int getTupleIndex() {
+		return tupleIndex;
+	}
+
+	public void setTupleIndex(int tupleIndex) {
+		this.tupleIndex = tupleIndex;
+	}
+
+	public double getValue() {
+		return value;
+	}
+
+	public void setValue(double value) {
+		this.value = value;
+	}
+
+	public int compareTo(TupleEntry tupleEntry) {
+		double cmp = this.getValue() - tupleEntry.getValue();
+		if (cmp > RTreeNSMFrame.doubleEpsilon())
+			return 1;
+		cmp = tupleEntry.getValue() - this.getValue();
+		if (cmp > RTreeNSMFrame.doubleEpsilon())
+			return -1;
+		return 0;
+	}
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntryArrayList.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntryArrayList.java
new file mode 100644
index 0000000..7bd2334
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntryArrayList.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+public class TupleEntryArrayList {
+	private TupleEntry[] data;
+	private int size;
+	private final int growth;
+
+	public TupleEntryArrayList(int initialCapacity, int growth) {
+		data = new TupleEntry[initialCapacity];
+		size = 0;
+		this.growth = growth;
+	}
+
+	public int size() {
+		return size;
+	}
+
+	public void add(int tupleIndex, double value) {
+		if (size == data.length) {
+			TupleEntry[] newData = new TupleEntry[data.length + growth];
+			System.arraycopy(data, 0, newData, 0, data.length);
+			data = newData;
+		}
+		if (data[size] == null) {
+			data[size] = new TupleEntry();
+		}
+		data[size].setTupleIndex(tupleIndex);
+		data[size].setValue(value);
+		size++;
+	}
+
+	public void removeLast() {
+		if (size > 0)
+			size--;
+	}
+
+	// WARNING: caller is responsible for checking size > 0
+	public TupleEntry getLast() {
+		return data[size - 1];
+	}
+
+	public TupleEntry get(int i) {
+		return data[i];
+	}
+
+	public void clear() {
+		size = 0;
+	}
+
+	public boolean isEmpty() {
+		return size == 0;
+	}
+
+	public void sort(EntriesOrder order, int tupleCount) {
+		if (order == EntriesOrder.ASCENDING) {
+			Arrays.sort(data, 0, tupleCount);
+		} else {
+			Arrays.sort(data, 0, tupleCount, Collections.reverseOrder());
+		}
+	}
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
new file mode 100644
index 0000000..a2dcb6d
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.frames.AbstractSlotManager;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMFrame;
+
+public class UnorderedSlotManager extends AbstractSlotManager {
+
+    @Override
+    public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
+            FindTupleMode mode, FindTupleNoExactMatchPolicy matchPolicy) {
+        if (searchKey.getFieldCount() == frameTuple.getFieldCount()) {
+            int maxFieldPos = multiCmp.getKeyFieldCount() / 2;
+            for (int i = 0; i < frame.getTupleCount(); i++) {
+                frameTuple.resetByTupleIndex(frame, i);
+
+                boolean foundTuple = true;
+                for (int j = 0; j < maxFieldPos; j++) {
+                    int k = maxFieldPos + j;
+                    int c1 = multiCmp.getComparators()[j].compare(frameTuple.getFieldData(j),
+                            frameTuple.getFieldStart(j), frameTuple.getFieldLength(j), searchKey.getFieldData(j),
+                            searchKey.getFieldStart(j), searchKey.getFieldLength(j));
+
+                    if (c1 != 0) {
+                        foundTuple = false;
+                        break;
+                    }
+                    int c2 = multiCmp.getComparators()[k].compare(frameTuple.getFieldData(k),
+                            frameTuple.getFieldStart(k), frameTuple.getFieldLength(k), searchKey.getFieldData(k),
+                            searchKey.getFieldStart(k), searchKey.getFieldLength(k));
+                    if (c2 != 0) {
+                        foundTuple = false;
+                        break;
+                    }
+                }
+                int remainingFieldCount = frameTuple.getFieldCount() - multiCmp.getKeyFieldCount();
+                for (int j = multiCmp.getKeyFieldCount(); j < multiCmp.getKeyFieldCount() + remainingFieldCount; j++) {
+                    if (!compareField(searchKey, frameTuple, j)) {
+                        foundTuple = false;
+                        break;
+                    }
+                }
+                if (foundTuple) {
+                    return i;
+                }
+            }
+        }
+        return -1;
+    }
+
+    private boolean compareField(ITupleReference searchKey, ITreeIndexTupleReference frameTuple, int fIdx) {
+        int searchKeyFieldLength = searchKey.getFieldLength(fIdx);
+        int frameTupleFieldLength = frameTuple.getFieldLength(fIdx);
+
+        if (searchKeyFieldLength != frameTupleFieldLength) {
+            return false;
+        }
+
+        for (int i = 0; i < searchKeyFieldLength; i++) {
+            if (searchKey.getFieldData(fIdx)[i + searchKey.getFieldStart(fIdx)] != frameTuple.getFieldData(fIdx)[i
+                    + frameTuple.getFieldStart(fIdx)]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public int insertSlot(int tupleIndex, int tupleOff) {
+        int slotOff = getSlotEndOff() - slotSize;
+        setSlot(slotOff, tupleOff);
+        return slotOff;
+    }
+
+    public void modifySlot(int slotOff, int tupleOff) {
+        setSlot(slotOff, tupleOff);
+    }
+
+    public void deleteSlot(int slotOff) {
+        System.arraycopy(frame.getBuffer().array(), getSlotEndOff(), frame.getBuffer().array(), slotOff + slotSize,
+                slotSize);
+    }
+
+    public void deleteEmptySlots() {
+        int slotOff = getSlotStartOff();
+        while (frame.getTupleCount() > 0) {
+            while (frame.getBuffer().getInt(getSlotEndOff()) == -1) {
+                ((RTreeNSMFrame) frame).setTupleCount(frame.getTupleCount() - 1);
+                if (frame.getTupleCount() == 0) {
+                    break;
+                }
+            }
+            if (frame.getTupleCount() == 0 || slotOff <= getSlotEndOff()) {
+                break;
+            }
+            if (frame.getBuffer().getInt(slotOff) == -1) {
+                modifySlot(slotOff, frame.getBuffer().getInt(getSlotEndOff()));
+                ((RTreeNSMFrame) frame).setTupleCount(frame.getTupleCount() - 1);
+            }
+            slotOff -= slotSize;
+
+        }
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriter.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriter.java
new file mode 100644
index 0000000..898ebf8
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.tuples;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriter;
+
+public class RTreeTypeAwareTupleWriter extends TypeAwareTupleWriter {
+
+    public RTreeTypeAwareTupleWriter(ITypeTraits[] typeTraits) {
+        super(typeTraits);
+    }
+
+    public int writeTupleFields(ITreeIndexTupleReference[] refs, int startField, ByteBuffer targetBuf, int targetOff) {
+        int runner = targetOff;
+        int nullFlagsBytes = getNullFlagsBytes(refs.length);
+        // write null indicator bits
+        for (int i = 0; i < nullFlagsBytes; i++) {
+            targetBuf.put(runner++, (byte) 0);
+        }
+
+        // write field slots for variable length fields
+        // since the r-tree has fixed length keys, we don't actually need this?
+        encDec.reset(targetBuf.array(), runner);
+        for (int i = startField; i < startField + refs.length; i++) {
+            if (!typeTraits[i].isFixedLength()) {
+                encDec.encode(refs[i].getFieldLength(i));
+            }
+        }
+        runner = encDec.getPos();
+
+        // write data
+        for (int i = 0; i < refs.length; i++) {
+            System.arraycopy(refs[i].getFieldData(i), refs[i].getFieldStart(i), targetBuf.array(), runner,
+                    refs[i].getFieldLength(i));
+            runner += refs[i].getFieldLength(i);
+        }
+        return runner - targetOff;
+
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriterFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriterFactory.java
new file mode 100644
index 0000000..24b2b53
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriterFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.tuples;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriterFactory;
+
+public class RTreeTypeAwareTupleWriterFactory implements ITreeIndexTupleWriterFactory {
+
+    private static final long serialVersionUID = 1L;
+    private ITypeTraits[] typeTraits;
+
+    public RTreeTypeAwareTupleWriterFactory(ITypeTraits[] typeTraits) {
+        this.typeTraits = typeTraits;
+    }
+
+    @Override    
+    public ITreeIndexTupleWriter createTupleWriter() {
+        return new RTreeTypeAwareTupleWriter(typeTraits);
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/util/RTreeUtils.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/util/RTreeUtils.java
new file mode 100644
index 0000000..6f5d36f
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/util/RTreeUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.util;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.data.PointablePrimitiveValueProviderFactory;
+import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.freepage.LinkedListFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+
+public class RTreeUtils {
+    public static RTree createRTree(IBufferCache bufferCache, ITypeTraits[] typeTraits,
+            IPrimitiveValueProviderFactory[] valueProviderFactories, IBinaryComparatorFactory[] cmpFactories) {
+
+        RTreeTypeAwareTupleWriterFactory tupleWriterFactory = new RTreeTypeAwareTupleWriterFactory(typeTraits);
+        ITreeIndexFrameFactory interiorFrameFactory = new RTreeNSMInteriorFrameFactory(tupleWriterFactory,
+                valueProviderFactories);
+        ITreeIndexFrameFactory leafFrameFactory = new RTreeNSMLeafFrameFactory(tupleWriterFactory,
+                valueProviderFactories);
+        ITreeIndexMetaDataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
+
+        IFreePageManager freePageManager = new LinkedListFreePageManager(bufferCache, 0, metaFrameFactory);
+        RTree rtree = new RTree(bufferCache, typeTraits.length, cmpFactories, freePageManager, interiorFrameFactory,
+                leafFrameFactory);
+        return rtree;
+    }
+
+    // Creates a new MultiComparator by constructing new IBinaryComparators.
+    public static MultiComparator getSearchMultiComparator(IBinaryComparatorFactory[] cmpFactories,
+            ITupleReference searchKey) {
+        if (searchKey == null || cmpFactories.length == searchKey.getFieldCount()) {
+            return MultiComparator.create(cmpFactories);
+        }
+        IBinaryComparator[] newCmps = new IBinaryComparator[searchKey.getFieldCount()];
+        for (int i = 0; i < searchKey.getFieldCount(); i++) {
+            newCmps[i] = cmpFactories[i].createBinaryComparator();
+        }
+        return new MultiComparator(newCmps);
+    }
+
+    public static IPrimitiveValueProviderFactory[] createPrimitiveValueProviderFactories(int len, IPointableFactory pf) {
+        IPrimitiveValueProviderFactory[] pvpfs = new IPrimitiveValueProviderFactory[len];
+        for (int i = 0; i < len; ++i) {
+            pvpfs[i] = new PointablePrimitiveValueProviderFactory(pf);
+        }
+        return pvpfs;
+    }
+}
\ No newline at end of file