Merged hyracks_dev_next into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@531 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-storage-am-rtree/.classpath b/hyracks/hyracks-storage-am-rtree/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks/hyracks-storage-am-rtree/.project b/hyracks/hyracks-storage-am-rtree/.project
new file mode 100644
index 0000000..4989318
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>hyracks-storage-am-rtree</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.maven.ide.eclipse.maven2Builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
+	</natures>
+</projectDescription>
diff --git a/hyracks/hyracks-storage-am-rtree/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-storage-am-rtree/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..375e12e
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Fri May 20 19:34:07 PDT 2011
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks/hyracks-storage-am-rtree/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-storage-am-rtree/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..ba4f536
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Wed Feb 02 19:49:19 PST 2011
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks/hyracks-storage-am-rtree/pom.xml b/hyracks/hyracks-storage-am-rtree/pom.xml
new file mode 100644
index 0000000..86328f75
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/pom.xml
@@ -0,0 +1,63 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks</groupId>
+  <artifactId>hyracks-storage-am-rtree</artifactId>
+  <version>0.1.7-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.1.7-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-storage-am-common</artifactId>
+  		<version>0.1.7-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>  	
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-common</artifactId>
+  		<version>0.1.7-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>  	
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-std</artifactId>
+  		<version>0.1.7-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>  	
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-control-nc</artifactId>
+  		<version>0.1.7-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>  		
+  	<dependency>
+  		<groupId>junit</groupId>
+  		<artifactId>junit</artifactId>
+  		<version>4.8.1</version>
+  		<type>jar</type>
+  		<scope>test</scope>
+  	</dependency>  	  		
+  </dependencies>
+</project>
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
new file mode 100644
index 0000000..ebc17b5
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.api;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.Rectangle;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.TupleEntryArrayList;
+
+public interface IRTreeFrame extends ITreeIndexFrame {
+
+    public ITreeIndexTupleReference createTupleReference();
+
+    public void generateDist(ITupleReference tuple, TupleEntryArrayList entries, Rectangle rec, int start, int end);
+
+    public void computeMBR(ISplitKey splitKey, MultiComparator cmp);
+
+    public void insert(ITupleReference tuple, MultiComparator cmp, int tupleIndex) throws Exception;
+
+    public void delete(int tupleIndex, MultiComparator cmp) throws Exception;
+
+    public int getPageNsn();
+
+    public void setPageNsn(int pageNsn);
+
+    public int getRightPage();
+
+    public void setRightPage(int rightPage);
+
+    public void adjustMBR(ITreeIndexTupleReference[] tuples, MultiComparator cmp);
+
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeInteriorFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeInteriorFrame.java
new file mode 100644
index 0000000..824c6b0
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeInteriorFrame.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.api;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.PathList;
+
+public interface IRTreeInteriorFrame extends IRTreeFrame {
+
+    public boolean findBestChild(ITupleReference tuple, MultiComparator cmp);
+
+    public int getBestChildPageId(MultiComparator cmp);
+
+    public int getChildPageIdIfIntersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
+
+    public int findTupleByPointer(ITupleReference tuple, MultiComparator cmp);
+
+    public int findTupleByPointer(ITupleReference tuple, PathList traverseList, int parentId, MultiComparator cmp);
+
+    public void adjustKey(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
+
+    public boolean recomputeMBR(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
+
+    public void enlarge(ITupleReference tuple, MultiComparator cmp);
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeLeafFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeLeafFrame.java
new file mode 100644
index 0000000..c85712d
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeLeafFrame.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.api;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public interface IRTreeLeafFrame extends IRTreeFrame {
+
+    public int findTupleIndex(ITupleReference tuple, MultiComparator cmp);
+
+    public boolean intersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeOpHelper.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeOpHelper.java
new file mode 100644
index 0000000..5b8931d
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeOpHelper.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.ITreeIndexOperatorDescriptorHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexHelperOpenMode;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexOpHelper;
+import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.freepage.LinkedListFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+
+public class RTreeOpHelper extends TreeIndexOpHelper {
+
+    protected MultiComparator interiorCmp;
+
+    public RTreeOpHelper(ITreeIndexOperatorDescriptorHelper opDesc, IHyracksStageletContext ctx, int partition,
+            IndexHelperOpenMode mode) {
+        super(opDesc, ctx, partition, mode);
+    }
+
+    public ITreeIndex createTreeIndex() throws HyracksDataException {
+        IBufferCache bufferCache = opDesc.getStorageManager().getBufferCache(ctx);
+        ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
+        IFreePageManager freePageManager = new LinkedListFreePageManager(bufferCache, indexFileId, 0,
+                metaDataFrameFactory);
+
+        return new RTree(bufferCache, freePageManager, opDesc.getTreeIndexInteriorFactory(),
+                opDesc.getTreeIndexLeafFactory(), cmp);
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeOpHelperFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeOpHelperFactory.java
new file mode 100644
index 0000000..b26f21a
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeOpHelperFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.ITreeIndexOpHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.ITreeIndexOperatorDescriptorHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexHelperOpenMode;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexOpHelper;
+
+public class RTreeOpHelperFactory implements ITreeIndexOpHelperFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public TreeIndexOpHelper createTreeIndexOpHelper(ITreeIndexOperatorDescriptorHelper opDesc,
+            IHyracksStageletContext ctx, int partition, IndexHelperOpenMode mode) {
+        return new RTreeOpHelper(opDesc, ctx, partition, mode);
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
new file mode 100644
index 0000000..8aab763
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.ITreeIndexOpHelperFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class RTreeSearchOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private int[] keyFields; // fields in input tuple to be used as keys
+
+    public RTreeSearchOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+            IStorageManagerInterface storageManager, IIndexRegistryProvider<ITreeIndex> treeIndexRegistryProvider,
+            IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
+            ITreeIndexFrameFactory leafFrameFactory, ITypeTrait[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, int[] keyFields, ITreeIndexOpHelperFactory opHelperFactory) {
+        super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                leafFrameFactory, typeTraits, comparatorFactories, opHelperFactory);
+        this.keyFields = keyFields;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx, final IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new RTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, keyFields);
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
new file mode 100644
index 0000000..6a13d9f
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexHelperOpenMode;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexOpHelper;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeOpContext;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
+
+public class RTreeSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private TreeIndexOpHelper treeIndexOpHelper;
+    private FrameTupleAccessor accessor;
+
+    private ByteBuffer writeBuffer;
+    private FrameTupleAppender appender;
+    private ArrayTupleBuilder tb;
+    private DataOutput dos;
+
+    private RTree rtree;
+    private PermutingFrameTupleReference searchKey;
+    private SearchPredicate searchPred;
+    private MultiComparator cmp;
+    private ITreeIndexCursor cursor;
+    private ITreeIndexFrame interiorFrame;
+    private ITreeIndexFrame leafFrame;
+    private RTreeOpContext opCtx;
+
+    private RecordDescriptor recDesc;
+
+    public RTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksStageletContext ctx,
+            int partition, IRecordDescriptorProvider recordDescProvider, int[] keyFields) {
+        treeIndexOpHelper = opDesc.getTreeIndexOpHelperFactory().createTreeIndexOpHelper(opDesc, ctx, partition,
+                IndexHelperOpenMode.OPEN);
+
+        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+        if (keyFields != null && keyFields.length > 0) {
+            searchKey = new PermutingFrameTupleReference();
+            searchKey.setFieldPermutation(keyFields);
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexOpHelper
+                .getOperatorDescriptor();
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksStageletContext().getFrameSize(), recDesc);
+
+        interiorFrame = opDesc.getTreeIndexInteriorFactory().createFrame();
+        leafFrame = opDesc.getTreeIndexLeafFactory().createFrame();
+        cursor = new RTreeSearchCursor((IRTreeInteriorFrame) interiorFrame, (IRTreeLeafFrame) leafFrame);
+
+        try {
+
+            treeIndexOpHelper.init();
+            rtree = (RTree) treeIndexOpHelper.getTreeIndex();
+
+            int keySearchFields = rtree.getCmp().getComparators().length;
+
+            IBinaryComparator[] keySearchComparators = new IBinaryComparator[keySearchFields];
+            for (int i = 0; i < keySearchFields; i++) {
+                keySearchComparators[i] = rtree.getCmp().getComparators()[i];
+            }
+            cmp = new MultiComparator(rtree.getCmp().getTypeTraits(), keySearchComparators);
+
+            searchPred = new SearchPredicate(searchKey, cmp);
+            accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksStageletContext().getFrameSize(), recDesc);
+
+            writeBuffer = treeIndexOpHelper.getHyracksStageletContext().allocateFrame();
+            tb = new ArrayTupleBuilder(rtree.getCmp().getFieldCount());
+            dos = tb.getDataOutput();
+            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksStageletContext().getFrameSize());
+            appender.reset(writeBuffer, true);
+
+            opCtx = rtree.createOpContext(IndexOp.SEARCH, treeIndexOpHelper.getLeafFrame(),
+                    treeIndexOpHelper.getInteriorFrame(), null);
+
+        } catch (Exception e) {
+            treeIndexOpHelper.deinit();
+        }
+    }
+
+    private void writeSearchResults() throws Exception {
+        while (cursor.hasNext()) {
+            tb.reset();
+            cursor.next();
+
+            ITupleReference frameTuple = cursor.getTuple();
+            for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+                dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+                tb.addFieldEndOffset();
+            }
+
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+                appender.reset(writeBuffer, true);
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+
+        int tupleCount = accessor.getTupleCount();
+        try {
+            for (int i = 0; i < tupleCount; i++) {
+                searchKey.reset(accessor, i);
+
+                searchPred.setSearchKey(searchKey);
+                cursor.reset();
+                rtree.search(cursor, searchPred, opCtx);
+                writeSearchResults();
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+            }
+            writer.close();
+            try {
+                cursor.close();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        } finally {
+            treeIndexOpHelper.deinit();
+        }
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(writeBuffer, writer);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java
new file mode 100644
index 0000000..17c10e5
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMFrame.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.frames.TreeIndexNSMFrame;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSplitKey;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.Rectangle;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.TupleEntryArrayList;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.UnorderedSlotManager;
+import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
+
+public abstract class RTreeNSMFrame extends TreeIndexNSMFrame implements IRTreeFrame {
+    protected static final int pageNsnOff = smFlagOff + 1;
+    protected static final int rightPageOff = pageNsnOff + 4;
+
+    protected ITreeIndexTupleReference[] tuples;
+    protected ITreeIndexTupleReference cmpFrameTuple;
+    protected TupleEntryArrayList tupleEntries1; // used for split and checking
+                                                 // enlargement
+    protected TupleEntryArrayList tupleEntries2; // used for split
+
+    protected Rectangle[] rec;
+
+    protected static final double splitFactor = 0.4;
+    protected static final int nearMinimumOverlapFactor = 32;
+    private static final double doubleEpsilon = computeDoubleEpsilon();
+    private static final int numTuplesEntries = 100;
+
+    public RTreeNSMFrame(ITreeIndexTupleWriter tupleWriter, int keyFieldCount) {
+        super(tupleWriter, new UnorderedSlotManager());
+        this.tuples = new ITreeIndexTupleReference[keyFieldCount];
+        for (int i = 0; i < keyFieldCount; i++) {
+            this.tuples[i] = tupleWriter.createTupleReference();
+        }
+        cmpFrameTuple = tupleWriter.createTupleReference();
+
+        tupleEntries1 = new TupleEntryArrayList(numTuplesEntries, numTuplesEntries);
+        tupleEntries2 = new TupleEntryArrayList(numTuplesEntries, numTuplesEntries);
+        rec = new Rectangle[4];
+        for (int i = 0; i < 4; i++) {
+            rec[i] = new Rectangle(keyFieldCount / 2);
+        }
+    }
+
+    private static double computeDoubleEpsilon() {
+        double doubleEpsilon = 1.0;
+
+        do {
+            doubleEpsilon /= 2.0;
+        } while (1.0 + (doubleEpsilon / 2.0) != 1.0);
+        return doubleEpsilon;
+    }
+
+    public static double doubleEpsilon() {
+        return doubleEpsilon;
+    }
+
+    @Override
+    public void initBuffer(byte level) {
+        super.initBuffer(level);
+        buf.putInt(pageNsnOff, 0);
+        buf.putInt(rightPageOff, -1);
+    }
+
+    public void setTupleCount(int tupleCount) {
+        buf.putInt(tupleCountOff, tupleCount);
+    }
+
+    @Override
+    public void setPageNsn(int pageNsn) {
+        buf.putInt(pageNsnOff, pageNsn);
+    }
+
+    @Override
+    public int getPageNsn() {
+        return buf.getInt(pageNsnOff);
+    }
+
+    @Override
+    protected void resetSpaceParams() {
+        buf.putInt(freeSpaceOff, rightPageOff + 4);
+        buf.putInt(totalFreeSpaceOff, buf.capacity() - (rightPageOff + 4));
+    }
+
+    @Override
+    public int getRightPage() {
+        return buf.getInt(rightPageOff);
+    }
+
+    @Override
+    public void setRightPage(int rightPage) {
+        buf.putInt(rightPageOff, rightPage);
+    }
+
+    protected ITreeIndexTupleReference[] getTuples() {
+        return tuples;
+    }
+
+    // for debugging
+    public ArrayList<Integer> getChildren(MultiComparator cmp) {
+        ArrayList<Integer> ret = new ArrayList<Integer>();
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        int tupleCount = buf.getInt(tupleCountOff);
+        for (int i = 0; i < tupleCount; i++) {
+            int tupleOff = slotManager.getTupleOff(slotManager.getSlotOff(i));
+            frameTuple.resetByTupleOffset(buf, tupleOff);
+
+            int intVal = IntegerSerializerDeserializer.getInt(
+                    buf.array(),
+                    frameTuple.getFieldStart(frameTuple.getFieldCount() - 1)
+                            + frameTuple.getFieldLength(frameTuple.getFieldCount() - 1));
+            ret.add(intVal);
+        }
+        return ret;
+    }
+
+    public void generateDist(ITupleReference tuple, TupleEntryArrayList entries, Rectangle rec, int start, int end) {
+        int j = 0;
+        while (entries.get(j).getTupleIndex() == -1) {
+            j++;
+        }
+        frameTuple.resetByTupleIndex(this, entries.get(j).getTupleIndex());
+        rec.set(frameTuple);
+        for (int i = start; i < end; ++i) {
+            if (i != j) {
+                if (entries.get(i).getTupleIndex() != -1) {
+                    frameTuple.resetByTupleIndex(this, entries.get(i).getTupleIndex());
+                    rec.enlarge(frameTuple);
+                } else {
+                    rec.enlarge(tuple);
+                }
+            }
+        }
+    }
+
+    @Override
+    public ITreeIndexTupleReference createTupleReference() {
+        return tupleWriter.createTupleReference();
+    }
+
+    public void adjustMBRImpl(ITreeIndexTupleReference[] tuples, MultiComparator cmp) {
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 1; i < getTupleCount(); i++) {
+            frameTuple.resetByTupleIndex(this, i);
+            for (int j = 0; j < maxFieldPos; j++) {
+                int k = maxFieldPos + j;
+                int c = cmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                        frameTuple.getFieldLength(j), tuples[j].getFieldData(j), tuples[j].getFieldStart(j),
+                        tuples[j].getFieldLength(j));
+                if (c < 0) {
+                    tuples[j].resetByTupleIndex(this, i);
+                }
+                c = cmp.getComparators()[k].compare(frameTuple.getFieldData(k), frameTuple.getFieldStart(k),
+                        frameTuple.getFieldLength(k), tuples[k].getFieldData(k), tuples[k].getFieldStart(k),
+                        tuples[k].getFieldLength(k));
+                if (c > 0) {
+                    tuples[k].resetByTupleIndex(this, i);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void computeMBR(ISplitKey splitKey, MultiComparator cmp) {
+        RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
+        frameTuple.setFieldCount(cmp.getFieldCount());
+
+        int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
+        frameTuple.resetByTupleOffset(buf, tupleOff);
+        int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0, cmp.getKeyFieldCount());
+
+        splitKey.initData(splitKeySize);
+        this.adjustMBR(tuples, cmp);
+        rTreeTupleWriterLeftFrame.writeTupleFields(tuples, 0, rTreeSplitKey.getLeftPageBuffer(), 0);
+        rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(), 0);
+    }
+
+    @Override
+    public int getPageHeaderSize() {
+        return rightPageOff;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
new file mode 100644
index 0000000..d03c5e9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrame.java
@@ -0,0 +1,645 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.SlotOffTupleOff;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.EntriesOrder;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.PathList;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSplitKey;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.UnorderedSlotManager;
+import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
+
+public class RTreeNSMInteriorFrame extends RTreeNSMFrame implements IRTreeInteriorFrame {
+
+    private static final int childPtrSize = 4;
+
+    public RTreeNSMInteriorFrame(ITreeIndexTupleWriter tupleWriter, int keyFieldCount) {
+        super(tupleWriter, keyFieldCount);
+    }
+
+    @Override
+    public String printKeys(MultiComparator cmp, ISerializerDeserializer[] fields) throws HyracksDataException {
+        StringBuilder strBuilder = new StringBuilder();
+        int tupleCount = buf.getInt(tupleCountOff);
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        for (int i = 0; i < tupleCount; i++) {
+            frameTuple.resetByTupleIndex(this, i);
+            for (int j = 0; j < cmp.getKeyFieldCount(); j++) {
+                ByteArrayInputStream inStream = new ByteArrayInputStream(frameTuple.getFieldData(j),
+                        frameTuple.getFieldStart(j), frameTuple.getFieldLength(j));
+                DataInput dataIn = new DataInputStream(inStream);
+                Object o = fields[j].deserialize(dataIn);
+                strBuilder.append(o.toString() + " ");
+            }
+            strBuilder.append(" | ");
+        }
+        strBuilder.append("\n");
+        return strBuilder.toString();
+    }
+
+    @Override
+    public boolean findBestChild(ITupleReference tuple, MultiComparator cmp) {
+        cmpFrameTuple.setFieldCount(cmp.getKeyFieldCount());
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+
+        int bestChild = 0;
+        double minEnlargedArea = Double.MAX_VALUE;
+
+        // the children pointers in the node point to leaves
+        if (getLevel() == 1) {
+            // find least overlap enlargement, use minimum enlarged area to
+            // break tie, if tie still exists use minimum area to break it
+            for (int i = 0; i < getTupleCount(); ++i) {
+                frameTuple.resetByTupleIndex(this, i);
+                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
+                tupleEntries1.add(i, enlargedArea);
+                if (enlargedArea < minEnlargedArea) {
+                    minEnlargedArea = enlargedArea;
+                    bestChild = i;
+                }
+            }
+            if (minEnlargedArea < RTreeNSMFrame.doubleEpsilon() || minEnlargedArea > RTreeNSMFrame.doubleEpsilon()) {
+                minEnlargedArea = Double.MAX_VALUE;
+                int k;
+                if (getTupleCount() > nearMinimumOverlapFactor) {
+                    // sort the entries based on their area enlargement needed
+                    // to include the object
+                    tupleEntries1.sort(EntriesOrder.ASCENDING, getTupleCount());
+                    k = nearMinimumOverlapFactor;
+                } else {
+                    k = getTupleCount();
+                }
+
+                double minOverlap = Double.MAX_VALUE;
+                int id = 0;
+                for (int i = 0; i < k; ++i) {
+                    double difference = 0.0;
+                    for (int j = 0; j < getTupleCount(); ++j) {
+                        frameTuple.resetByTupleIndex(this, j);
+                        cmpFrameTuple.resetByTupleIndex(this, tupleEntries1.get(i).getTupleIndex());
+
+                        int c = pointerCmp(frameTuple, cmpFrameTuple, cmp);
+                        if (c != 0) {
+                            double intersection = overlappedArea(frameTuple, tuple, cmpFrameTuple, cmp);
+                            if (intersection != 0.0) {
+                                difference += intersection - overlappedArea(frameTuple, null, cmpFrameTuple, cmp);
+                            }
+                        } else {
+                            id = j;
+                        }
+                    }
+
+                    double enlargedArea = enlargedArea(cmpFrameTuple, tuple, cmp);
+                    if (difference < minOverlap) {
+                        minOverlap = difference;
+                        minEnlargedArea = enlargedArea;
+                        bestChild = id;
+                    } else if (difference == minOverlap) {
+                        if (enlargedArea < minEnlargedArea) {
+                            minEnlargedArea = enlargedArea;
+                            bestChild = id;
+                        } else if (enlargedArea == minEnlargedArea) {
+                            double area = area(cmpFrameTuple, cmp);
+                            frameTuple.resetByTupleIndex(this, bestChild);
+                            double minArea = area(frameTuple, cmp);
+                            if (area < minArea) {
+                                bestChild = id;
+                            }
+                        }
+                    }
+                }
+            }
+        } else { // find minimum enlarged area, use minimum area to break tie
+            for (int i = 0; i < getTupleCount(); i++) {
+                frameTuple.resetByTupleIndex(this, i);
+                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
+                if (enlargedArea < minEnlargedArea) {
+                    minEnlargedArea = enlargedArea;
+                    bestChild = i;
+                } else if (enlargedArea == minEnlargedArea) {
+                    double area = area(frameTuple, cmp);
+                    frameTuple.resetByTupleIndex(this, bestChild);
+                    double minArea = area(frameTuple, cmp);
+                    if (area < minArea) {
+                        bestChild = i;
+                    }
+                }
+            }
+        }
+        tupleEntries1.clear();
+
+        frameTuple.resetByTupleIndex(this, bestChild);
+        if (minEnlargedArea > 0.0) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int getBestChildPageId(MultiComparator cmp) {
+        return buf.getInt(getChildPointerOff(frameTuple, cmp));
+    }
+
+    @Override
+    public int findTupleByPointer(ITupleReference tuple, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        for (int i = 0; i < getTupleCount(); i++) {
+            frameTuple.resetByTupleIndex(this, i);
+            int c = pointerCmp(frameTuple, tuple, cmp);
+            if (c == 0) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public int getChildPageIdIfIntersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        frameTuple.resetByTupleIndex(this, tupleIndex);
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i), frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j));
+            if (c > 0) {
+                return -1;
+            }
+            c = cmp.getComparators()[i].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
+                    frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+            if (c < 0) {
+                return -1;
+            }
+        }
+        // return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
+        return buf.getInt(getChildPointerOff(frameTuple, cmp));
+    }
+
+    @Override
+    public int findTupleByPointer(ITupleReference tuple, PathList traverseList, int parentIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        for (int i = 0; i < getTupleCount(); i++) {
+            frameTuple.resetByTupleIndex(this, i);
+
+            int c = pointerCmp(frameTuple, tuple, cmp);
+            if (c == 0) {
+                return i;
+            } else {
+                int pageId = IntegerSerializerDeserializer.getInt(frameTuple.getFieldData(cmp.getKeyFieldCount() - 1),
+                        getChildPointerOff(frameTuple, cmp));
+                traverseList.add(pageId, -1, parentIndex);
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public boolean compact(MultiComparator cmp) {
+        resetSpaceParams();
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+
+        int tupleCount = buf.getInt(tupleCountOff);
+        int freeSpace = buf.getInt(freeSpaceOff);
+
+        ArrayList<SlotOffTupleOff> sortedTupleOffs = new ArrayList<SlotOffTupleOff>();
+        sortedTupleOffs.ensureCapacity(tupleCount);
+        for (int i = 0; i < tupleCount; i++) {
+            int slotOff = slotManager.getSlotOff(i);
+            int tupleOff = slotManager.getTupleOff(slotOff);
+            sortedTupleOffs.add(new SlotOffTupleOff(i, slotOff, tupleOff));
+        }
+        Collections.sort(sortedTupleOffs);
+
+        for (int i = 0; i < sortedTupleOffs.size(); i++) {
+            int tupleOff = sortedTupleOffs.get(i).tupleOff;
+            frameTuple.resetByTupleOffset(buf, tupleOff);
+
+            int tupleEndOff = frameTuple.getFieldStart(frameTuple.getFieldCount() - 1)
+                    + frameTuple.getFieldLength(frameTuple.getFieldCount() - 1);
+            int tupleLength = tupleEndOff - tupleOff + childPtrSize;
+            System.arraycopy(buf.array(), tupleOff, buf.array(), freeSpace, tupleLength);
+
+            slotManager.setSlot(sortedTupleOffs.get(i).slotOff, freeSpace);
+            freeSpace += tupleLength;
+        }
+
+        buf.putInt(freeSpaceOff, freeSpace);
+        buf.putInt(totalFreeSpaceOff, buf.capacity() - freeSpace - tupleCount * slotManager.getSlotSize());
+
+        return false;
+    }
+
+    @Override
+    public FrameOpSpaceStatus hasSpaceInsert(ITupleReference tuple, MultiComparator cmp) {
+        int bytesRequired = tupleWriter.bytesRequired(tuple) + childPtrSize; // for
+                                                                             // the
+                                                                             // child
+                                                                             // pointer
+        if (bytesRequired + slotManager.getSlotSize() <= buf.capacity() - buf.getInt(freeSpaceOff)
+                - (buf.getInt(tupleCountOff) * slotManager.getSlotSize()))
+            return FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE;
+        else if (bytesRequired + slotManager.getSlotSize() <= buf.getInt(totalFreeSpaceOff))
+            return FrameOpSpaceStatus.SUFFICIENT_SPACE;
+        else
+            return FrameOpSpaceStatus.INSUFFICIENT_SPACE;
+    }
+
+    @Override
+    public void adjustKey(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        if (tupleIndex == -1) {
+            tupleIndex = findTupleByPointer(tuple, cmp);
+        }
+        if (tupleIndex != -1) {
+            tupleWriter.writeTuple(tuple, buf, getTupleOffset(tupleIndex));
+        }
+    }
+
+    private int pointerCmp(ITupleReference tupleA, ITupleReference tupleB, MultiComparator cmp) {
+        return cmp.getIntCmp().compare(tupleA.getFieldData(cmp.getKeyFieldCount() - 1),
+                getChildPointerOff(tupleA, cmp), childPtrSize, tupleB.getFieldData(cmp.getKeyFieldCount() - 1),
+                getChildPointerOff(tupleB, cmp), childPtrSize);
+    }
+
+    @Override
+    public int split(ITreeIndexFrame rightFrame, ITupleReference tuple, MultiComparator cmp, ISplitKey splitKey)
+            throws Exception {
+
+        rightFrame.setPageTupleFieldCount(cmp.getKeyFieldCount());
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+
+        RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame = ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
+
+        // calculations are based on the R*-tree paper
+        int m = (int) Math.floor((getTupleCount() + 1) * splitFactor);
+        int splitDistribution = getTupleCount() - (2 * m) + 2;
+
+        // to calculate the minimum margin in order to pick the split axis
+        double minMargin = Double.MAX_VALUE;
+        int splitAxis = 0, sortOrder = 0;
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            for (int k = 0; k < getTupleCount(); ++k) {
+
+                frameTuple.resetByTupleIndex(this, k);
+
+                double LowerKey = DoubleSerializerDeserializer.getDouble(frameTuple.getFieldData(i),
+                        frameTuple.getFieldStart(i));
+                double UpperKey = DoubleSerializerDeserializer.getDouble(frameTuple.getFieldData(j),
+                        frameTuple.getFieldStart(j));
+
+                tupleEntries1.add(k, LowerKey);
+                tupleEntries2.add(k, UpperKey);
+            }
+            double LowerKey = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(i), tuple.getFieldStart(i));
+            double UpperKey = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(j), tuple.getFieldStart(j));
+
+            tupleEntries1.add(-1, LowerKey);
+            tupleEntries2.add(-1, UpperKey);
+
+            tupleEntries1.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
+            tupleEntries2.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
+
+            double lowerMargin = 0.0, upperMargin = 0.0;
+            // generate distribution
+            for (int k = 1; k <= splitDistribution; ++k) {
+                int d = m - 1 + k;
+
+                generateDist(tuple, tupleEntries1, rec[0], 0, d);
+                generateDist(tuple, tupleEntries2, rec[1], 0, d);
+                generateDist(tuple, tupleEntries1, rec[2], d, getTupleCount() + 1);
+                generateDist(tuple, tupleEntries2, rec[3], d, getTupleCount() + 1);
+
+                // calculate the margin of the distributions
+                lowerMargin += rec[0].margin() + rec[2].margin();
+                upperMargin += rec[1].margin() + rec[3].margin();
+            }
+            double margin = Math.min(lowerMargin, upperMargin);
+
+            // store minimum margin as split axis
+            if (margin < minMargin) {
+                minMargin = margin;
+                splitAxis = i;
+                sortOrder = (lowerMargin < upperMargin) ? 0 : 2;
+            }
+
+            tupleEntries1.clear();
+            tupleEntries2.clear();
+        }
+
+        for (int i = 0; i < getTupleCount(); ++i) {
+            frameTuple.resetByTupleIndex(this, i);
+            double key = DoubleSerializerDeserializer.getDouble(frameTuple.getFieldData(splitAxis + sortOrder),
+                    frameTuple.getFieldStart(splitAxis + sortOrder));
+            tupleEntries1.add(i, key);
+        }
+        double key = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(splitAxis + sortOrder),
+                tuple.getFieldStart(splitAxis + sortOrder));
+        tupleEntries1.add(-1, key);
+        tupleEntries1.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
+
+        double minArea = Double.MAX_VALUE;
+        double minOverlap = Double.MAX_VALUE;
+        int splitPoint = 0;
+        for (int i = 1; i <= splitDistribution; ++i) {
+            int d = m - 1 + i;
+
+            generateDist(tuple, tupleEntries1, rec[0], 0, d);
+            generateDist(tuple, tupleEntries1, rec[2], d, getTupleCount() + 1);
+
+            double overlap = rec[0].overlappedArea(rec[2]);
+            if (overlap < minOverlap) {
+                splitPoint = d;
+                minOverlap = overlap;
+                minArea = rec[0].area() + rec[2].area();
+            } else if (overlap == minOverlap) {
+                double area = rec[0].area() + rec[2].area();
+                if (area < minArea) {
+                    splitPoint = d;
+                    minArea = area;
+                }
+            }
+        }
+        int startIndex, endIndex;
+        if (splitPoint < (getTupleCount() + 1) / 2) {
+            startIndex = 0;
+            endIndex = splitPoint;
+        } else {
+            startIndex = splitPoint;
+            endIndex = (getTupleCount() + 1);
+        }
+        boolean tupleInserted = false;
+        int totalBytes = 0, numOfDeletedTuples = 0;
+        for (int i = startIndex; i < endIndex; i++) {
+            if (tupleEntries1.get(i).getTupleIndex() != -1) {
+                frameTuple.resetByTupleIndex(this, tupleEntries1.get(i).getTupleIndex());
+                rightFrame.insert(frameTuple, cmp, -1);
+                ((UnorderedSlotManager) slotManager).modifySlot(
+                        slotManager.getSlotOff(tupleEntries1.get(i).getTupleIndex()), -1);
+                totalBytes += tupleWriter.bytesRequired(frameTuple) + childPtrSize;
+                numOfDeletedTuples++;
+            } else {
+                rightFrame.insert(tuple, cmp, -1);
+                tupleInserted = true;
+            }
+        }
+
+        ((UnorderedSlotManager) slotManager).deleteEmptySlots();
+
+        // maintain space information
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + totalBytes
+                + (slotManager.getSlotSize() * numOfDeletedTuples));
+
+        // compact both pages
+        rightFrame.compact(cmp);
+        compact(cmp);
+
+        if (!tupleInserted) {
+            insert(tuple, cmp, -1);
+        }
+
+        int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
+        frameTuple.resetByTupleOffset(buf, tupleOff);
+        int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0, cmp.getKeyFieldCount());
+
+        splitKey.initData(splitKeySize);
+        this.adjustMBR(tuples, cmp);
+        rTreeTupleWriterLeftFrame.writeTupleFields(tuples, 0, rTreeSplitKey.getLeftPageBuffer(), 0);
+        rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(), 0);
+
+        ((IRTreeFrame) rightFrame).adjustMBR(((RTreeNSMFrame) rightFrame).getTuples(), cmp);
+        rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getTuples(), 0,
+                rTreeSplitKey.getRightPageBuffer(), 0);
+        rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(), 0);
+
+        tupleEntries1.clear();
+        tupleEntries2.clear();
+        return 0;
+    }
+
+    private int getChildPointerOff(ITupleReference tuple, MultiComparator cmp) {
+        return tuple.getFieldStart(cmp.getKeyFieldCount() - 1) + tuple.getFieldLength(cmp.getKeyFieldCount() - 1);
+    }
+
+    @Override
+    public void insert(ITupleReference tuple, MultiComparator cmp, int tupleIndex) throws Exception {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        slotManager.insertSlot(-1, buf.getInt(freeSpaceOff));
+        int freeSpace = buf.getInt(freeSpaceOff);
+        int bytesWritten = tupleWriter.writeTupleFields(tuple, 0, cmp.getKeyFieldCount(), buf, freeSpace);
+        System.arraycopy(tuple.getFieldData(cmp.getKeyFieldCount() - 1), getChildPointerOff(tuple, cmp), buf.array(),
+                freeSpace + bytesWritten, childPtrSize);
+        int tupleSize = bytesWritten + childPtrSize;
+
+        buf.putInt(tupleCountOff, buf.getInt(tupleCountOff) + 1);
+        buf.putInt(freeSpaceOff, buf.getInt(freeSpaceOff) + tupleSize);
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) - tupleSize - slotManager.getSlotSize());
+
+    }
+
+    @Override
+    public void delete(int tupleIndex, MultiComparator cmp) throws Exception {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        int slotOff = slotManager.getSlotOff(tupleIndex);
+
+        int tupleOff = slotManager.getTupleOff(slotOff);
+        frameTuple.resetByTupleOffset(buf, tupleOff);
+        int tupleSize = tupleWriter.bytesRequired(frameTuple);
+
+        // perform deletion (we just do a memcpy to overwrite the slot)
+        int slotStartOff = slotManager.getSlotEndOff();
+        int length = slotOff - slotStartOff;
+        System.arraycopy(buf.array(), slotStartOff, buf.array(), slotStartOff + slotManager.getSlotSize(), length);
+
+        // maintain space information
+        buf.putInt(tupleCountOff, buf.getInt(tupleCountOff) - 1);
+        buf.putInt(totalFreeSpaceOff,
+                buf.getInt(totalFreeSpaceOff) + tupleSize + childPtrSize + slotManager.getSlotSize());
+    }
+
+    @Override
+    public boolean recomputeMBR(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getKeyFieldCount());
+        frameTuple.resetByTupleIndex(this, tupleIndex);
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+                    frameTuple.getFieldLength(i), tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i));
+            if (c != 0) {
+                return true;
+            }
+            c = cmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j), tuple.getFieldData(j), tuple.getFieldStart(j),
+                    tuple.getFieldLength(j));
+
+            if (c != 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private double overlappedArea(ITupleReference tuple1, ITupleReference tupleToBeInserted, ITupleReference tuple2,
+            MultiComparator cmp) {
+        double area = 1.0;
+        double f1, f2;
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            double pHigh1, pLow1;
+            if (tupleToBeInserted != null) {
+                int c = cmp.getComparators()[i].compare(tuple1.getFieldData(i), tuple1.getFieldStart(i),
+                        tuple1.getFieldLength(i), tupleToBeInserted.getFieldData(i),
+                        tupleToBeInserted.getFieldStart(i), tupleToBeInserted.getFieldLength(i));
+                if (c < 0) {
+                    pLow1 = DoubleSerializerDeserializer.getDouble(tuple1.getFieldData(i), tuple1.getFieldStart(i));
+                } else {
+                    pLow1 = DoubleSerializerDeserializer.getDouble(tupleToBeInserted.getFieldData(i),
+                            tupleToBeInserted.getFieldStart(i));
+                }
+
+                c = cmp.getComparators()[j].compare(tuple1.getFieldData(j), tuple1.getFieldStart(j),
+                        tuple1.getFieldLength(j), tupleToBeInserted.getFieldData(j),
+                        tupleToBeInserted.getFieldStart(j), tupleToBeInserted.getFieldLength(j));
+                if (c > 0) {
+                    pHigh1 = DoubleSerializerDeserializer.getDouble(tuple1.getFieldData(j), tuple1.getFieldStart(j));
+                } else {
+                    pHigh1 = DoubleSerializerDeserializer.getDouble(tupleToBeInserted.getFieldData(j),
+                            tupleToBeInserted.getFieldStart(j));
+                }
+            } else {
+                pLow1 = DoubleSerializerDeserializer.getDouble(tuple1.getFieldData(i), tuple1.getFieldStart(i));
+                pHigh1 = DoubleSerializerDeserializer.getDouble(tuple1.getFieldData(j), tuple1.getFieldStart(j));
+            }
+
+            double pLow2 = DoubleSerializerDeserializer.getDouble(tuple2.getFieldData(i), tuple2.getFieldStart(i));
+            double pHigh2 = DoubleSerializerDeserializer.getDouble(tuple2.getFieldData(j), tuple2.getFieldStart(j));
+
+            if (pLow1 > pHigh2 || pHigh1 < pLow2) {
+                return 0.0;
+            }
+
+            f1 = Math.max(pLow1, pLow2);
+            f2 = Math.min(pHigh1, pHigh2);
+            area *= f2 - f1;
+        }
+        return area;
+    }
+
+    private double enlargedArea(ITupleReference tuple, ITupleReference tupleToBeInserted, MultiComparator cmp) {
+        double areaBeforeEnlarge = area(tuple, cmp);
+        double areaAfterEnlarge = 1.0;
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            double pHigh, pLow;
+            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i), tupleToBeInserted.getFieldData(i), tupleToBeInserted.getFieldStart(i),
+                    tupleToBeInserted.getFieldLength(i));
+            if (c < 0) {
+                pLow = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(i), tuple.getFieldStart(i));
+            } else {
+                pLow = DoubleSerializerDeserializer.getDouble(tupleToBeInserted.getFieldData(i),
+                        tupleToBeInserted.getFieldStart(i));
+            }
+
+            c = cmp.getComparators()[j].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
+                    tupleToBeInserted.getFieldData(j), tupleToBeInserted.getFieldStart(j),
+                    tupleToBeInserted.getFieldLength(j));
+            if (c > 0) {
+                pHigh = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(j), tuple.getFieldStart(j));
+            } else {
+                pHigh = DoubleSerializerDeserializer.getDouble(tupleToBeInserted.getFieldData(j),
+                        tupleToBeInserted.getFieldStart(j));
+            }
+            areaAfterEnlarge *= pHigh - pLow;
+        }
+        return areaAfterEnlarge - areaBeforeEnlarge;
+    }
+
+    private double area(ITupleReference tuple, MultiComparator cmp) {
+        double area = 1.0;
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            area *= DoubleSerializerDeserializer.getDouble(tuple.getFieldData(j), tuple.getFieldStart(j))
+                    - DoubleSerializerDeserializer.getDouble(tuple.getFieldData(i), tuple.getFieldStart(i));
+        }
+        return area;
+    }
+
+    @Override
+    public void enlarge(ITupleReference tuple, MultiComparator cmp) {
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+                    frameTuple.getFieldLength(i), tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i));
+            if (c > 0) {
+                System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), frameTuple.getFieldData(i),
+                        frameTuple.getFieldStart(i), tuple.getFieldLength(i));
+            }
+            c = cmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j), tuple.getFieldData(j), tuple.getFieldStart(j),
+                    tuple.getFieldLength(j));
+            if (c < 0) {
+                System.arraycopy(tuple.getFieldData(j), tuple.getFieldStart(j), frameTuple.getFieldData(j),
+                        frameTuple.getFieldStart(j), tuple.getFieldLength(j));
+            }
+        }
+    }
+
+    @Override
+    public void adjustMBR(ITreeIndexTupleReference[] tuples, MultiComparator cmp) {
+        for (int i = 0; i < tuples.length; i++) {
+            tuples[i].setFieldCount(cmp.getKeyFieldCount());
+            tuples[i].resetByTupleIndex(this, 0);
+        }
+
+        adjustMBRImpl(tuples, cmp);
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrameFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrameFactory.java
new file mode 100644
index 0000000..9fd5a54
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMInteriorFrameFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+
+public class RTreeNSMInteriorFrameFactory implements ITreeIndexFrameFactory {
+
+    private static final long serialVersionUID = 1L;
+    private ITreeIndexTupleWriterFactory tupleWriterFactory;
+    private int keyFieldCount;
+
+    public RTreeNSMInteriorFrameFactory(ITreeIndexTupleWriterFactory tupleWriterFactory, int keyFieldCount) {
+        this.tupleWriterFactory = tupleWriterFactory;
+        this.keyFieldCount = keyFieldCount;
+    }
+
+    @Override
+    public IRTreeInteriorFrame createFrame() {
+        return new RTreeNSMInteriorFrame(tupleWriterFactory.createTupleWriter(), keyFieldCount);
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
new file mode 100644
index 0000000..7ebe974
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrame.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.EntriesOrder;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSplitKey;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.UnorderedSlotManager;
+import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
+
+public class RTreeNSMLeafFrame extends RTreeNSMFrame implements IRTreeLeafFrame {
+
+    public RTreeNSMLeafFrame(ITreeIndexTupleWriter tupleWriter, int keyFieldCount) {
+        super(tupleWriter, keyFieldCount);
+    }
+
+    @Override
+    public int findTupleIndex(ITupleReference tuple, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getFieldCount());
+        return slotManager.findTupleIndex(tuple, frameTuple, cmp, null, null);
+    }
+
+    @Override
+    public boolean intersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getFieldCount());
+        frameTuple.resetByTupleIndex(this, tupleIndex);
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i), frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j));
+            if (c > 0) {
+                return false;
+            }
+            c = cmp.getComparators()[i].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
+                    frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+
+            if (c < 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public int split(ITreeIndexFrame rightFrame, ITupleReference tuple, MultiComparator cmp, ISplitKey splitKey)
+            throws Exception {
+
+        rightFrame.setPageTupleFieldCount(cmp.getFieldCount());
+        frameTuple.setFieldCount(cmp.getFieldCount());
+
+        RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame = ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
+
+        // calculations are based on the R*-tree paper
+        int m = (int) Math.floor((getTupleCount() + 1) * splitFactor);
+        int splitDistribution = getTupleCount() - (2 * m) + 2;
+
+        // to calculate the minimum margin in order to pick the split axis
+        double minMargin = Double.MAX_VALUE;
+        int splitAxis = 0, sortOrder = 0;
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            for (int k = 0; k < getTupleCount(); ++k) {
+
+                frameTuple.resetByTupleIndex(this, k);
+
+                double LowerKey = DoubleSerializerDeserializer.getDouble(frameTuple.getFieldData(i),
+                        frameTuple.getFieldStart(i));
+                double UpperKey = DoubleSerializerDeserializer.getDouble(frameTuple.getFieldData(j),
+                        frameTuple.getFieldStart(j));
+
+                tupleEntries1.add(k, LowerKey);
+                tupleEntries2.add(k, UpperKey);
+            }
+            double LowerKey = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(i), tuple.getFieldStart(i));
+            double UpperKey = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(j), tuple.getFieldStart(j));
+
+            tupleEntries1.add(-1, LowerKey);
+            tupleEntries2.add(-1, UpperKey);
+
+            tupleEntries1.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
+            tupleEntries2.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
+
+            double lowerMargin = 0.0, upperMargin = 0.0;
+            // generate distribution
+            for (int k = 1; k <= splitDistribution; ++k) {
+                int d = m - 1 + k;
+
+                generateDist(tuple, tupleEntries1, rec[0], 0, d);
+                generateDist(tuple, tupleEntries2, rec[1], 0, d);
+                generateDist(tuple, tupleEntries1, rec[2], d, getTupleCount() + 1);
+                generateDist(tuple, tupleEntries2, rec[3], d, getTupleCount() + 1);
+
+                // calculate the margin of the distributions
+                lowerMargin += rec[0].margin() + rec[2].margin();
+                upperMargin += rec[1].margin() + rec[3].margin();
+            }
+            double margin = Math.min(lowerMargin, upperMargin);
+
+            // store minimum margin as split axis
+            if (margin < minMargin) {
+                minMargin = margin;
+                splitAxis = i;
+                sortOrder = (lowerMargin < upperMargin) ? 0 : 2;
+            }
+
+            tupleEntries1.clear();
+            tupleEntries2.clear();
+        }
+
+        for (int i = 0; i < getTupleCount(); ++i) {
+            frameTuple.resetByTupleIndex(this, i);
+            double key = DoubleSerializerDeserializer.getDouble(frameTuple.getFieldData(splitAxis + sortOrder),
+                    frameTuple.getFieldStart(splitAxis + sortOrder));
+            tupleEntries1.add(i, key);
+        }
+        double key = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(splitAxis + sortOrder),
+                tuple.getFieldStart(splitAxis + sortOrder));
+        tupleEntries1.add(-1, key);
+        tupleEntries1.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
+
+        double minArea = Double.MAX_VALUE;
+        double minOverlap = Double.MAX_VALUE;
+        int splitPoint = 0;
+        for (int i = 1; i <= splitDistribution; ++i) {
+            int d = m - 1 + i;
+
+            generateDist(tuple, tupleEntries1, rec[0], 0, d);
+            generateDist(tuple, tupleEntries1, rec[2], d, getTupleCount() + 1);
+
+            double overlap = rec[0].overlappedArea(rec[2]);
+            if (overlap < minOverlap) {
+                splitPoint = d;
+                minOverlap = overlap;
+                minArea = rec[0].area() + rec[2].area();
+            } else if (overlap == minOverlap) {
+                double area = rec[0].area() + rec[2].area();
+                if (area < minArea) {
+                    splitPoint = d;
+                    minArea = area;
+                }
+            }
+        }
+        int startIndex, endIndex;
+        if (splitPoint < (getTupleCount() + 1) / 2) {
+            startIndex = 0;
+            endIndex = splitPoint;
+        } else {
+            startIndex = splitPoint;
+            endIndex = (getTupleCount() + 1);
+        }
+        boolean tupleInserted = false;
+        int totalBytes = 0, numOfDeletedTuples = 0;
+        for (int i = startIndex; i < endIndex; i++) {
+            if (tupleEntries1.get(i).getTupleIndex() != -1) {
+                frameTuple.resetByTupleIndex(this, tupleEntries1.get(i).getTupleIndex());
+                rightFrame.insert(frameTuple, cmp, -1);
+                ((UnorderedSlotManager) slotManager).modifySlot(
+                        slotManager.getSlotOff(tupleEntries1.get(i).getTupleIndex()), -1);
+                totalBytes += tupleWriter.bytesRequired(frameTuple);
+                numOfDeletedTuples++;
+            } else {
+                rightFrame.insert(tuple, cmp, -1);
+                tupleInserted = true;
+            }
+        }
+
+        ((UnorderedSlotManager) slotManager).deleteEmptySlots();
+
+        // maintain space information
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + totalBytes
+                + (slotManager.getSlotSize() * numOfDeletedTuples));
+
+        // compact both pages
+        rightFrame.compact(cmp);
+        compact(cmp);
+
+        if (!tupleInserted) {
+            insert(tuple, cmp, -1);
+        }
+
+        int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
+        frameTuple.resetByTupleOffset(buf, tupleOff);
+        int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0, cmp.getKeyFieldCount());
+
+        splitKey.initData(splitKeySize);
+        this.adjustMBR(tuples, cmp);
+        rTreeTupleWriterLeftFrame.writeTupleFields(tuples, 0, rTreeSplitKey.getLeftPageBuffer(), 0);
+        rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(), 0);
+
+        ((IRTreeFrame) rightFrame).adjustMBR(((RTreeNSMFrame) rightFrame).getTuples(), cmp);
+        rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getTuples(), 0,
+                rTreeSplitKey.getRightPageBuffer(), 0);
+        rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(), 0);
+
+        tupleEntries1.clear();
+        tupleEntries2.clear();
+        return 0;
+    }
+
+    @Override
+    public void insert(ITupleReference tuple, MultiComparator cmp, int tupleIndex) throws Exception {
+        frameTuple.setFieldCount(cmp.getFieldCount());
+        slotManager.insertSlot(-1, buf.getInt(freeSpaceOff));
+        int bytesWritten = tupleWriter.writeTuple(tuple, buf, buf.getInt(freeSpaceOff));
+
+        buf.putInt(tupleCountOff, buf.getInt(tupleCountOff) + 1);
+        buf.putInt(freeSpaceOff, buf.getInt(freeSpaceOff) + bytesWritten);
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) - bytesWritten - slotManager.getSlotSize());
+    }
+
+    @Override
+    public void delete(int tupleIndex, MultiComparator cmp) throws Exception {
+        frameTuple.setFieldCount(cmp.getFieldCount());
+        int slotOff = slotManager.getSlotOff(tupleIndex);
+
+        int tupleOff = slotManager.getTupleOff(slotOff);
+        frameTuple.resetByTupleOffset(buf, tupleOff);
+        int tupleSize = tupleWriter.bytesRequired(frameTuple);
+
+        // perform deletion (we just do a memcpy to overwrite the slot)
+        int slotStartOff = slotManager.getSlotEndOff();
+        int length = slotOff - slotStartOff;
+        System.arraycopy(buf.array(), slotStartOff, buf.array(), slotStartOff + slotManager.getSlotSize(), length);
+
+        // maintain space information
+        buf.putInt(tupleCountOff, buf.getInt(tupleCountOff) - 1);
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + tupleSize + slotManager.getSlotSize());
+    }
+
+    @Override
+    public void adjustMBR(ITreeIndexTupleReference[] tuples, MultiComparator cmp) {
+        for (int i = 0; i < tuples.length; i++) {
+            tuples[i].setFieldCount(cmp.getFieldCount());
+            tuples[i].resetByTupleIndex(this, 0);
+        }
+
+        adjustMBRImpl(tuples, cmp);
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrameFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrameFactory.java
new file mode 100644
index 0000000..7da7f26
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreeNSMLeafFrameFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.frames;
+
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+
+public class RTreeNSMLeafFrameFactory implements ITreeIndexFrameFactory {
+
+    private static final long serialVersionUID = 1L;
+    private ITreeIndexTupleWriterFactory tupleWriterFactory;
+    private int keyFieldCount;
+
+    public RTreeNSMLeafFrameFactory(ITreeIndexTupleWriterFactory tupleWriterFactory, int keyFieldCount) {
+        this.tupleWriterFactory = tupleWriterFactory;
+        this.keyFieldCount = keyFieldCount;
+    }
+
+    @Override
+    public IRTreeLeafFrame createFrame() {
+        return new RTreeNSMLeafFrame(tupleWriterFactory.createTupleWriter(), keyFieldCount);
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/EntriesOrder.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/EntriesOrder.java
new file mode 100644
index 0000000..8dce1ac
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/EntriesOrder.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+public enum EntriesOrder {
+    ASCENDING, DESCENDING
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/PathList.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/PathList.java
new file mode 100644
index 0000000..58bad69
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/PathList.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IntArrayList;
+
+public class PathList {
+    private IntArrayList pageIds;
+    private IntArrayList pageLsns;
+    private IntArrayList pageIndexes;
+
+    public PathList(int initialCapacity, int growth) {
+        pageIds = new IntArrayList(initialCapacity, growth);
+        pageLsns = new IntArrayList(initialCapacity, growth);
+        pageIndexes = new IntArrayList(initialCapacity, growth);
+    }
+
+    public int size() {
+        return pageIds.size();
+    }
+
+    public int first() {
+        return pageIds.first();
+    }
+
+    public void add(int pageId, int pageLsn, int pageIndex) {
+        pageIds.add(pageId);
+        pageLsns.add(pageLsn);
+        pageIndexes.add(pageIndex);
+    }
+
+    public int getFirstPageId() {
+        return pageIds.getFirst();
+    }
+
+    public int getFirstPageLsn() {
+        return pageLsns.getFirst();
+    }
+
+    public int getFirstPageIndex() {
+        return pageIndexes.getFirst();
+    }
+
+    public int getLastPageId() {
+        return pageIds.getLast();
+    }
+
+    public int getLastPageLsn() {
+        return pageLsns.getLast();
+    }
+
+    public int getLastPageIndex() {
+        return pageIndexes.getLast();
+    }
+
+    public int getPageId(int i) {
+        return pageIds.get(i);
+    }
+
+    public int getPageLsn(int i) {
+        return pageLsns.get(i);
+    }
+
+    public int getPageIndex(int i) {
+        return pageIndexes.get(i);
+    }
+
+    public void setPageLsn(int i, int pageLsn) {
+        pageLsns.set(i, pageLsn);
+    }
+
+    public void moveFirst() {
+        pageIds.moveFirst();
+        pageLsns.moveFirst();
+        pageIndexes.moveFirst();
+    }
+
+    public void moveLast() {
+        pageIds.removeLast();
+        pageLsns.removeLast();
+        pageIndexes.removeLast();
+    }
+
+    public boolean isLast() {
+        return pageIds.isLast();
+    }
+
+    public void clear() {
+        pageIds.clear();
+        pageLsns.clear();
+        pageIndexes.clear();
+    }
+
+    public boolean isEmpty() {
+        return pageIds.isEmpty();
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
new file mode 100644
index 0000000..8976409
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -0,0 +1,937 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexType;
+import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
+import edu.uci.ics.hyracks.storage.am.common.impls.TreeDiskOrderScanCursor;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMFrame;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+
+public class RTree implements ITreeIndex {
+
+    private boolean created = false;
+    private boolean loaded = false;
+    private final int rootPage = 1; // the root page never changes
+
+    private final AtomicInteger globalNsn; // Global node sequence number
+    private int numOfPages = 1;
+    private final ReadWriteLock treeLatch;
+
+    private final IFreePageManager freePageManager;
+    private final IBufferCache bufferCache;
+    private int fileId;
+
+    private final SearchPredicate diskOrderScanPredicate;
+    private final ITreeIndexFrameFactory interiorFrameFactory;
+    private final ITreeIndexFrameFactory leafFrameFactory;
+    private final MultiComparator cmp;
+
+    public int rootSplits = 0;
+    public int[] splitsByLevel = new int[500];
+    public AtomicLong readLatchesAcquired = new AtomicLong();
+    public AtomicLong readLatchesReleased = new AtomicLong();
+    public AtomicLong writeLatchesAcquired = new AtomicLong();
+    public AtomicLong writeLatchesReleased = new AtomicLong();
+    public AtomicLong pins = new AtomicLong();
+    public AtomicLong unpins = new AtomicLong();
+    public byte currentLevel = 0;
+
+    public RTree(IBufferCache bufferCache, IFreePageManager freePageManager,
+            ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory leafFrameFactory, MultiComparator cmp) {
+        this.bufferCache = bufferCache;
+        this.freePageManager = freePageManager;
+        this.interiorFrameFactory = interiorFrameFactory;
+        this.leafFrameFactory = leafFrameFactory;
+        this.cmp = cmp;
+        globalNsn = new AtomicInteger();
+        this.treeLatch = new ReentrantReadWriteLock(true);
+        this.diskOrderScanPredicate = new SearchPredicate(null, cmp);
+    }
+
+    public void incrementGlobalNsn() {
+        globalNsn.incrementAndGet();
+    }
+
+    public int getGlobalNsn() {
+        return globalNsn.get();
+    }
+
+    public void incrementReadLatchesAcquired() {
+        readLatchesAcquired.incrementAndGet();
+    }
+
+    public void incrementReadLatchesReleased() {
+        readLatchesReleased.incrementAndGet();
+    }
+
+    public void incrementWriteLatchesAcquired() {
+        writeLatchesAcquired.incrementAndGet();
+    }
+
+    public void incrementWriteLatchesReleased() {
+        writeLatchesReleased.incrementAndGet();
+    }
+
+    public void incrementPins() {
+        pins.incrementAndGet();
+    }
+
+    public void incrementUnpins() {
+        unpins.incrementAndGet();
+    }
+
+    public String printStats() {
+        StringBuilder strBuilder = new StringBuilder();
+        strBuilder.append("\n");
+        strBuilder.append("ROOTSPLITS: " + rootSplits + "\n");
+        strBuilder.append("SPLITS BY LEVEL\n");
+        for (int i = 0; i < currentLevel; i++) {
+            strBuilder.append(String.format("%3d ", i) + String.format("%8d ", splitsByLevel[i]) + "\n");
+        }
+        strBuilder.append(String.format("READ LATCHES:  %10d %10d\n", readLatchesAcquired.get(),
+                readLatchesReleased.get()));
+        strBuilder.append(String.format("WRITE LATCHES: %10d %10d\n", writeLatchesAcquired.get(),
+                writeLatchesReleased.get()));
+        strBuilder.append(String.format("PINS:          %10d %10d\n", pins.get(), unpins.get()));
+
+        strBuilder.append(String.format("Num of Pages:          %10d\n", numOfPages));
+
+        return strBuilder.toString();
+    }
+
+    public void printTree(IRTreeFrame leafFrame, IRTreeFrame interiorFrame, ISerializerDeserializer[] fields)
+            throws Exception {
+        printTree(rootPage, null, false, leafFrame, interiorFrame, fields);
+    }
+
+    public void printTree(int pageId, ICachedPage parent, boolean unpin, IRTreeFrame leafFrame,
+            IRTreeFrame interiorFrame, ISerializerDeserializer[] fields) throws Exception {
+
+        ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+        incrementPins();
+        node.acquireReadLatch();
+        incrementReadLatchesAcquired();
+
+        try {
+            if (parent != null && unpin == true) {
+                parent.releaseReadLatch();
+                incrementReadLatchesReleased();
+                bufferCache.unpin(parent);
+                incrementUnpins();
+            }
+
+            interiorFrame.setPage(node);
+            int level = interiorFrame.getLevel();
+
+            System.out.format("%1d ", level);
+            System.out.format("%3d ", pageId);
+            for (int i = 0; i < currentLevel - level; i++)
+                System.out.format("    ");
+
+            String keyString;
+            if (interiorFrame.isLeaf()) {
+                leafFrame.setPage(node);
+                keyString = leafFrame.printKeys(cmp, fields);
+            } else {
+                keyString = interiorFrame.printKeys(cmp, fields);
+            }
+
+            System.out.format(keyString);
+            if (!interiorFrame.isLeaf()) {
+                ArrayList<Integer> children = ((RTreeNSMFrame) (interiorFrame)).getChildren(cmp);
+                for (int i = 0; i < children.size(); i++) {
+                    printTree(children.get(i), node, i == children.size() - 1, leafFrame, interiorFrame, fields);
+                }
+            } else {
+                node.releaseReadLatch();
+                incrementReadLatchesReleased();
+                bufferCache.unpin(node);
+                incrementUnpins();
+            }
+        } catch (Exception e) {
+            node.releaseReadLatch();
+            incrementReadLatchesReleased();
+            bufferCache.unpin(node);
+            incrementUnpins();
+            throw e;
+        }
+    }
+
+    @Override
+    public void create(int fileId, ITreeIndexFrame leafFrame, ITreeIndexMetaDataFrame metaFrame) throws Exception {
+        if (created)
+            return;
+
+        treeLatch.writeLock().lock();
+        try {
+            // check if another thread beat us to it
+            if (created)
+                return;
+
+            freePageManager.init(metaFrame, rootPage);
+
+            // initialize root page
+            ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), true);
+            incrementPins();
+
+            rootNode.acquireWriteLatch();
+            incrementWriteLatchesAcquired();
+            try {
+                leafFrame.setPage(rootNode);
+                leafFrame.initBuffer((byte) 0);
+            } finally {
+                rootNode.releaseWriteLatch();
+                incrementWriteLatchesReleased();
+                bufferCache.unpin(rootNode);
+                incrementUnpins();
+            }
+            currentLevel = 0;
+
+            created = true;
+        } finally {
+            treeLatch.writeLock().unlock();
+        }
+    }
+
+    public void open(int fileId) {
+        this.fileId = fileId;
+    }
+
+    public void close() {
+        fileId = -1;
+    }
+
+    @Override
+    public RTreeOpContext createOpContext(IndexOp op, ITreeIndexFrame leafFrame, ITreeIndexFrame interiorFrame,
+            ITreeIndexMetaDataFrame metaFrame) {
+        return new RTreeOpContext(op, (IRTreeLeafFrame) leafFrame, (IRTreeInteriorFrame) interiorFrame, metaFrame, 8);
+    }
+
+    @Override
+    public void insert(ITupleReference tuple, IndexOpContext ictx) throws Exception {
+        RTreeOpContext ctx = (RTreeOpContext) ictx;
+        ctx.reset();
+        ctx.setTuple(tuple);
+        ctx.splitKey.reset();
+        ctx.splitKey.getLeftTuple().setFieldCount(cmp.getKeyFieldCount());
+        ctx.splitKey.getRightTuple().setFieldCount(cmp.getKeyFieldCount());
+        ctx.interiorFrame.setPageTupleFieldCount(cmp.getKeyFieldCount());
+        ctx.leafFrame.setPageTupleFieldCount(cmp.getFieldCount());
+
+        ICachedPage leafNode = findLeaf(ctx);
+
+        int pageId = ctx.pathList.getLastPageId();
+        ctx.pathList.moveLast();
+        insertTuple(leafNode, pageId, ctx.getTuple(), ctx, true);
+
+        while (true) {
+            if (ctx.splitKey.getLeftPageBuffer() != null) {
+                updateParentForInsert(ctx);
+            } else {
+                break;
+            }
+        }
+
+        leafNode.releaseWriteLatch();
+        incrementWriteLatchesReleased();
+        bufferCache.unpin(leafNode);
+        incrementUnpins();
+    }
+
+    public ICachedPage findLeaf(RTreeOpContext ctx) throws Exception {
+        int pageId = rootPage;
+        boolean writeLatched = false;
+        ICachedPage node = null;
+        boolean isLeaf = false;
+        int pageLsn = 0, parentLsn = 0;
+
+        while (true) {
+            if (!writeLatched) {
+                node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+                incrementPins();
+                ctx.interiorFrame.setPage(node);
+                isLeaf = ctx.interiorFrame.isLeaf();
+                if (isLeaf) {
+                    node.acquireWriteLatch();
+                    incrementWriteLatchesAcquired();
+                    writeLatched = true;
+
+                    if (!ctx.interiorFrame.isLeaf()) {
+                        node.releaseWriteLatch();
+                        incrementWriteLatchesReleased();
+                        bufferCache.unpin(node);
+                        incrementUnpins();
+                        writeLatched = false;
+                        continue;
+                    }
+                } else {
+                    // Be optimistic and grab read latch first. We will swap it
+                    // to write latch if we need to enlarge the best child
+                    // tuple.
+                    node.acquireReadLatch();
+                    incrementReadLatchesAcquired();
+                }
+            }
+
+            if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
+                // Concurrent split detected, go back to parent and re-choose
+                // the best child
+                if (writeLatched) {
+                    node.releaseWriteLatch();
+                    incrementWriteLatchesReleased();
+                    bufferCache.unpin(node);
+                    incrementUnpins();
+                    writeLatched = false;
+                } else {
+                    node.releaseReadLatch();
+                    incrementReadLatchesReleased();
+                    bufferCache.unpin(node);
+                    incrementUnpins();
+                }
+
+                pageId = ctx.pathList.getLastPageId();
+                if (pageId != rootPage) {
+                    parentLsn = ctx.pathList.getPageLsn(ctx.pathList.size() - 2);
+                }
+                ctx.pathList.moveLast();
+                continue;
+            }
+
+            pageLsn = ctx.interiorFrame.getPageLsn();
+            ctx.pathList.add(pageId, pageLsn, -1);
+
+            if (!isLeaf) {
+                // findBestChild must be called *before* getBestChildPageId
+                ctx.interiorFrame.findBestChild(ctx.getTuple(), cmp);
+                int childPageId = ctx.interiorFrame.getBestChildPageId(cmp);
+
+                if (!writeLatched) {
+                    node.releaseReadLatch();
+                    incrementReadLatchesReleased();
+                    // TODO: do we need to un-pin and pin again?
+                    bufferCache.unpin(node);
+                    incrementUnpins();
+
+                    node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+                    incrementPins();
+                    node.acquireWriteLatch();
+                    incrementWriteLatchesAcquired();
+                    ctx.interiorFrame.setPage(node);
+                    writeLatched = true;
+
+                    if (ctx.interiorFrame.getPageLsn() != pageLsn) {
+                        // The page was changed while we unlocked it; thus,
+                        // retry (re-choose best child)
+
+                        ctx.pathList.moveLast();
+                        continue;
+                    }
+                }
+
+                // We don't need to reset the frameTuple because it is
+                // already pointing to the best child
+                ctx.interiorFrame.enlarge(ctx.getTuple(), cmp);
+
+                node.releaseWriteLatch();
+                incrementWriteLatchesReleased();
+                bufferCache.unpin(node);
+                incrementUnpins();
+                writeLatched = false;
+
+                pageId = childPageId;
+                parentLsn = pageLsn;
+            } else {
+                ctx.leafFrame.setPage(node);
+                return node;
+            }
+        }
+    }
+
+    private void insertTuple(ICachedPage node, int pageId, ITupleReference tuple, RTreeOpContext ctx, boolean isLeaf)
+            throws Exception {
+        FrameOpSpaceStatus spaceStatus;
+        if (!isLeaf) {
+            spaceStatus = ctx.interiorFrame.hasSpaceInsert(tuple, cmp);
+        } else {
+            spaceStatus = ctx.leafFrame.hasSpaceInsert(tuple, cmp);
+        }
+
+        switch (spaceStatus) {
+            case SUFFICIENT_CONTIGUOUS_SPACE: {
+                if (!isLeaf) {
+                    ctx.interiorFrame.insert(tuple, cmp, -1);
+                    incrementGlobalNsn();
+                    ctx.interiorFrame.setPageLsn(getGlobalNsn());
+                } else {
+                    ctx.leafFrame.insert(tuple, cmp, -1);
+                    incrementGlobalNsn();
+                    ctx.leafFrame.setPageLsn(getGlobalNsn());
+                }
+                ctx.splitKey.reset();
+                break;
+            }
+
+            case SUFFICIENT_SPACE: {
+                if (!isLeaf) {
+                    ctx.interiorFrame.compact(cmp);
+                    ctx.interiorFrame.insert(tuple, cmp, -1);
+                    incrementGlobalNsn();
+                    ctx.interiorFrame.setPageLsn(getGlobalNsn());
+                } else {
+                    ctx.leafFrame.compact(cmp);
+                    ctx.leafFrame.insert(tuple, cmp, -1);
+                    incrementGlobalNsn();
+                    ctx.leafFrame.setPageLsn(getGlobalNsn());
+                }
+                ctx.splitKey.reset();
+                break;
+            }
+
+            case INSUFFICIENT_SPACE: {
+                int rightPageId = freePageManager.getFreePage(ctx.metaFrame);
+                ICachedPage rightNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rightPageId), true);
+                incrementPins();
+                rightNode.acquireWriteLatch();
+                incrementWriteLatchesAcquired();
+
+                try {
+                    IRTreeFrame rightFrame;
+                    int ret;
+                    numOfPages++; // debug
+                    if (!isLeaf) {
+                        splitsByLevel[ctx.interiorFrame.getLevel()]++; // debug
+                        rightFrame = (IRTreeFrame) interiorFrameFactory.createFrame();
+                        rightFrame.setPage(rightNode);
+                        rightFrame.initBuffer((byte) ctx.interiorFrame.getLevel());
+                        rightFrame.setPageTupleFieldCount(cmp.getKeyFieldCount());
+                        ret = ctx.interiorFrame.split(rightFrame, tuple, cmp, ctx.splitKey);
+                        ctx.interiorFrame.setRightPage(rightPageId);
+                        rightFrame.setPageNsn(ctx.interiorFrame.getPageNsn());
+                        incrementGlobalNsn();
+                        int newNsn = getGlobalNsn();
+                        rightFrame.setPageLsn(newNsn);
+                        ctx.interiorFrame.setPageNsn(newNsn);
+                        ctx.interiorFrame.setPageLsn(newNsn);
+                    } else {
+                        splitsByLevel[0]++; // debug
+                        rightFrame = (IRTreeFrame) leafFrameFactory.createFrame();
+                        rightFrame.setPage(rightNode);
+                        rightFrame.initBuffer((byte) 0);
+                        rightFrame.setPageTupleFieldCount(cmp.getFieldCount());
+                        ret = ctx.leafFrame.split(rightFrame, tuple, cmp, ctx.splitKey);
+                        ctx.leafFrame.setRightPage(rightPageId);
+                        rightFrame.setPageNsn(ctx.leafFrame.getPageNsn());
+                        incrementGlobalNsn();
+                        int newNsn = getGlobalNsn();
+                        rightFrame.setPageLsn(newNsn);
+                        ctx.leafFrame.setPageNsn(newNsn);
+                        ctx.leafFrame.setPageLsn(newNsn);
+                    }
+                    if (ret != 0) {
+                        ctx.splitKey.reset();
+                    } else {
+                        ctx.splitKey.setPages(pageId, rightPageId);
+                    }
+                    if (pageId == rootPage) {
+                        rootSplits++; // debug
+                        splitsByLevel[currentLevel]++;
+                        currentLevel++;
+
+                        int newLeftId = freePageManager.getFreePage(ctx.metaFrame);
+                        ICachedPage newLeftNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, newLeftId),
+                                true);
+                        incrementPins();
+                        newLeftNode.acquireWriteLatch();
+                        incrementWriteLatchesAcquired();
+                        try {
+                            // copy left child to new left child
+                            System.arraycopy(node.getBuffer().array(), 0, newLeftNode.getBuffer().array(), 0,
+                                    newLeftNode.getBuffer().capacity());
+
+                            // initialize new root (leftNode becomes new root)
+                            ctx.interiorFrame.setPage(node);
+                            ctx.interiorFrame.initBuffer((byte) (ctx.interiorFrame.getLevel() + 1));
+
+                            ctx.splitKey.setLeftPage(newLeftId);
+
+                            ctx.interiorFrame.insert(ctx.splitKey.getLeftTuple(), cmp, -1);
+                            ctx.interiorFrame.insert(ctx.splitKey.getRightTuple(), cmp, -1);
+
+                            incrementGlobalNsn();
+                            int newNsn = getGlobalNsn();
+                            ctx.interiorFrame.setPageLsn(newNsn);
+                            ctx.interiorFrame.setPageNsn(newNsn);
+                        } finally {
+                            newLeftNode.releaseWriteLatch();
+                            incrementWriteLatchesReleased();
+                            bufferCache.unpin(newLeftNode);
+                            incrementUnpins();
+                        }
+
+                        ctx.splitKey.reset();
+                    }
+                } finally {
+                    rightNode.releaseWriteLatch();
+                    incrementWriteLatchesReleased();
+                    bufferCache.unpin(rightNode);
+                    incrementUnpins();
+                }
+                break;
+            }
+        }
+    }
+
+    public void updateParentForInsert(RTreeOpContext ctx) throws Exception {
+        int parentId = ctx.pathList.getLastPageId();
+        ICachedPage parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
+        incrementPins();
+        parentNode.acquireWriteLatch();
+        incrementWriteLatchesAcquired();
+        ctx.interiorFrame.setPage(parentNode);
+        boolean foundParent = true;
+
+        if (ctx.interiorFrame.getPageLsn() != ctx.pathList.getLastPageLsn()) {
+            foundParent = false;
+            while (true) {
+                if (ctx.interiorFrame.findTupleByPointer(ctx.splitKey.getLeftTuple(), cmp) != -1) {
+                    // found the parent
+                    foundParent = true;
+                    break;
+                }
+                int rightPage = ctx.interiorFrame.getRightPage();
+                parentNode.releaseWriteLatch();
+                incrementWriteLatchesReleased();
+                bufferCache.unpin(parentNode);
+                incrementUnpins();
+
+                if (rightPage == -1) {
+                    break;
+                }
+
+                parentId = rightPage;
+                parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
+                incrementPins();
+                parentNode.acquireWriteLatch();
+                incrementWriteLatchesAcquired();
+                ctx.interiorFrame.setPage(parentNode);
+            }
+        }
+        if (foundParent) {
+            ctx.interiorFrame.adjustKey(ctx.splitKey.getLeftTuple(), -1, cmp);
+            insertTuple(parentNode, parentId, ctx.splitKey.getRightTuple(), ctx, ctx.interiorFrame.isLeaf());
+            ctx.pathList.moveLast();
+
+            parentNode.releaseWriteLatch();
+            incrementWriteLatchesReleased();
+            bufferCache.unpin(parentNode);
+            incrementUnpins();
+            return;
+        }
+
+        // very rare situation when the there is a root split, do an exhaustive
+        // breadth-first traversal looking for the parent tuple
+
+        ctx.pathList.clear();
+        ctx.traverseList.clear();
+        findPath(ctx);
+        updateParentForInsert(ctx);
+    }
+
+    public void findPath(RTreeOpContext ctx) throws Exception {
+        int pageId = rootPage;
+        int parentIndex = -1;
+        int parentLsn = 0;
+        int pageLsn, pageIndex;
+        ctx.traverseList.add(pageId, -1, parentIndex);
+        while (!ctx.traverseList.isLast()) {
+            pageId = ctx.traverseList.getFirstPageId();
+            parentIndex = ctx.traverseList.getFirstPageIndex();
+
+            ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+            incrementPins();
+            node.acquireReadLatch();
+            incrementReadLatchesAcquired();
+            ctx.interiorFrame.setPage(node);
+            pageLsn = ctx.interiorFrame.getPageLsn();
+            pageIndex = ctx.traverseList.first();
+            ctx.traverseList.setPageLsn(pageIndex, pageLsn);
+
+            ctx.traverseList.moveFirst();
+
+            if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
+                int rightPage = ctx.interiorFrame.getRightPage();
+                if (rightPage != -1) {
+                    ctx.traverseList.add(rightPage, -1, parentIndex);
+                }
+            }
+            parentLsn = pageLsn;
+
+            if (ctx.interiorFrame.findTupleByPointer(ctx.splitKey.getLeftTuple(), ctx.traverseList, pageIndex, cmp) != -1) {
+                fillPath(ctx, pageIndex);
+
+                node.releaseReadLatch();
+                incrementReadLatchesReleased();
+                bufferCache.unpin(node);
+                incrementUnpins();
+                return;
+            }
+            node.releaseReadLatch();
+            incrementReadLatchesReleased();
+            bufferCache.unpin(node);
+            incrementUnpins();
+        }
+    }
+
+    public void fillPath(RTreeOpContext ctx, int pageIndex) throws Exception {
+        if (pageIndex != -1) {
+            fillPath(ctx, ctx.traverseList.getPageIndex(pageIndex));
+            ctx.pathList.add(ctx.traverseList.getPageId(pageIndex), ctx.traverseList.getPageLsn(pageIndex), -1);
+        }
+    }
+
+    @Override
+    public void delete(ITupleReference tuple, IndexOpContext ictx) throws Exception {
+        RTreeOpContext ctx = (RTreeOpContext) ictx;
+        ctx.reset();
+        ctx.setTuple(tuple);
+        ctx.splitKey.reset();
+        ctx.splitKey.getLeftTuple().setFieldCount(cmp.getKeyFieldCount());
+        ctx.interiorFrame.setPageTupleFieldCount(cmp.getKeyFieldCount());
+        ctx.leafFrame.setPageTupleFieldCount(cmp.getFieldCount());
+
+        int tupleIndex = findTupleToDelete(ctx);
+
+        if (tupleIndex != -1) {
+            int pageId = ctx.pathList.getLastPageId();
+            ctx.pathList.moveLast();
+            deleteTuple(pageId, tupleIndex, ctx);
+
+            while (true) {
+                if (ctx.splitKey.getLeftPageBuffer() != null) {
+                    updateParentForDelete(ctx);
+                } else {
+                    break;
+                }
+            }
+
+            ctx.leafFrame.getPage().releaseWriteLatch();
+            incrementWriteLatchesReleased();
+            bufferCache.unpin(ctx.leafFrame.getPage());
+            incrementUnpins();
+        }
+    }
+
+    public void updateParentForDelete(RTreeOpContext ctx) throws Exception {
+        int parentId = ctx.pathList.getLastPageId();
+        ICachedPage parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
+        incrementPins();
+        parentNode.acquireWriteLatch();
+        incrementWriteLatchesAcquired();
+        ctx.interiorFrame.setPage(parentNode);
+        boolean foundParent = true;
+        int tupleIndex = -1;
+
+        if (ctx.interiorFrame.getPageLsn() != ctx.pathList.getLastPageLsn()) {
+            foundParent = false;
+            while (true) {
+                tupleIndex = ctx.interiorFrame.findTupleByPointer(ctx.splitKey.getLeftTuple(), cmp);
+                if (tupleIndex != -1) {
+                    // found the parent
+                    foundParent = true;
+                    break;
+                }
+                int rightPage = ctx.interiorFrame.getRightPage();
+                parentNode.releaseWriteLatch();
+                incrementWriteLatchesReleased();
+                bufferCache.unpin(parentNode);
+                incrementUnpins();
+
+                if (rightPage == -1) {
+                    break;
+                }
+
+                parentId = rightPage;
+                parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
+                incrementPins();
+                parentNode.acquireWriteLatch();
+                incrementWriteLatchesAcquired();
+                ctx.interiorFrame.setPage(parentNode);
+            }
+        }
+        if (foundParent) {
+            if (tupleIndex == -1) {
+                tupleIndex = ctx.interiorFrame.findTupleByPointer(ctx.splitKey.getLeftTuple(), cmp);
+            }
+            boolean recomputeMBR = ctx.interiorFrame.recomputeMBR(ctx.splitKey.getLeftTuple(), tupleIndex, cmp);
+
+            if (recomputeMBR) {
+                ctx.interiorFrame.adjustKey(ctx.splitKey.getLeftTuple(), tupleIndex, cmp);
+                ctx.pathList.moveLast();
+
+                incrementGlobalNsn();
+                ctx.interiorFrame.setPageLsn(getGlobalNsn());
+
+                ctx.splitKey.reset();
+                if (!ctx.pathList.isEmpty()) {
+                    ctx.interiorFrame.computeMBR(ctx.splitKey, cmp);
+                    ctx.splitKey.setLeftPage(parentId);
+                }
+            } else {
+                ctx.pathList.moveLast();
+                ctx.splitKey.reset();
+            }
+
+            parentNode.releaseWriteLatch();
+            incrementWriteLatchesReleased();
+            bufferCache.unpin(parentNode);
+            incrementUnpins();
+            return;
+        }
+
+        // very rare situation when the there is a root split, do an exhaustive
+        // breadth-first traversal looking for the parent tuple
+
+        ctx.pathList.clear();
+        ctx.traverseList.clear();
+        findPath(ctx);
+        updateParentForDelete(ctx);
+    }
+
+    public int findTupleToDelete(RTreeOpContext ctx) throws Exception {
+
+        ctx.traverseList.add(rootPage, -1, -1);
+        ctx.pathList.add(rootPage, -1, ctx.traverseList.size() - 1);
+
+        while (!ctx.pathList.isEmpty()) {
+            int pageId = ctx.pathList.getLastPageId();
+            int parentLsn = ctx.pathList.getLastPageLsn();
+            int pageIndex = ctx.pathList.getLastPageIndex();
+            ctx.pathList.moveLast();
+            ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+            incrementPins();
+            node.acquireReadLatch();
+            incrementReadLatchesAcquired();
+            ctx.interiorFrame.setPage(node);
+            boolean isLeaf = ctx.interiorFrame.isLeaf();
+            int pageLsn = ctx.interiorFrame.getPageLsn();
+            int parentIndex = ctx.traverseList.getPageIndex(pageIndex);
+            ctx.traverseList.setPageLsn(pageIndex, pageLsn);
+
+            if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
+                // Concurrent split detected, we need to visit the right page
+                int rightPage = ctx.interiorFrame.getRightPage();
+                if (rightPage != -1) {
+                    ctx.traverseList.add(rightPage, -1, parentIndex);
+                    ctx.pathList.add(rightPage, parentLsn, ctx.traverseList.size() - 1);
+                }
+            }
+
+            if (!isLeaf) {
+                for (int i = 0; i < ctx.interiorFrame.getTupleCount(); i++) {
+                    int childPageId = ctx.interiorFrame.getChildPageIdIfIntersect(ctx.tuple, i, cmp);
+                    if (childPageId != -1) {
+                        ctx.traverseList.add(childPageId, -1, pageIndex);
+                        ctx.pathList.add(childPageId, pageLsn, ctx.traverseList.size() - 1);
+                    }
+                }
+            } else {
+                ctx.leafFrame.setPage(node);
+                int tupleIndex = ctx.leafFrame.findTupleIndex(ctx.tuple, cmp);
+                if (tupleIndex != -1) {
+
+                    node.releaseReadLatch();
+                    incrementReadLatchesReleased();
+                    bufferCache.unpin(node);
+                    incrementUnpins();
+
+                    node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+                    incrementPins();
+                    node.acquireWriteLatch();
+                    incrementWriteLatchesAcquired();
+                    ctx.leafFrame.setPage(node);
+
+                    if (ctx.leafFrame.getPageLsn() != pageLsn) {
+                        // The page was changed while we unlocked it
+
+                        tupleIndex = ctx.leafFrame.findTupleIndex(ctx.tuple, cmp);
+                        if (tupleIndex == -1) {
+                            ctx.traverseList.add(pageId, -1, parentIndex);
+                            ctx.pathList.add(pageId, parentLsn, ctx.traverseList.size() - 1);
+
+                            node.releaseWriteLatch();
+                            incrementWriteLatchesReleased();
+                            bufferCache.unpin(node);
+                            incrementUnpins();
+                            continue;
+                        } else {
+                            ctx.pathList.clear();
+                            fillPath(ctx, pageIndex);
+                            return tupleIndex;
+                        }
+                    } else {
+                        ctx.pathList.clear();
+                        fillPath(ctx, pageIndex);
+                        return tupleIndex;
+                    }
+                }
+            }
+            node.releaseReadLatch();
+            incrementReadLatchesReleased();
+            bufferCache.unpin(node);
+            incrementUnpins();
+        }
+        return -1;
+    }
+
+    public void deleteTuple(int pageId, int tupleIndex, RTreeOpContext ctx) throws Exception {
+        ctx.leafFrame.delete(tupleIndex, cmp);
+        incrementGlobalNsn();
+        ctx.leafFrame.setPageLsn(getGlobalNsn());
+
+        // if the page is empty, just leave it there for future inserts
+        if (pageId != rootPage && ctx.leafFrame.getTupleCount() > 0) {
+            ctx.leafFrame.computeMBR(ctx.splitKey, cmp);
+            ctx.splitKey.setLeftPage(pageId);
+        }
+    }
+
+    public void search(ITreeIndexCursor cursor, SearchPredicate pred, RTreeOpContext ctx) throws Exception {
+        ctx.reset();
+        ctx.cursor = cursor;
+
+        cursor.setBufferCache(bufferCache);
+        cursor.setFileId(fileId);
+        ctx.cursorInitialState.setRootPage(rootPage);
+        ctx.cursor.open(ctx.cursorInitialState, pred);
+    }
+
+    public ITreeIndexFrameFactory getInteriorFrameFactory() {
+        return interiorFrameFactory;
+    }
+
+    public ITreeIndexFrameFactory getLeafFrameFactory() {
+        return leafFrameFactory;
+    }
+
+    public MultiComparator getCmp() {
+        return cmp;
+    }
+
+    public IFreePageManager getFreePageManager() {
+        return freePageManager;
+    }
+
+    @Override
+    public void update(ITupleReference tuple, IndexOpContext ictx) throws Exception {
+        throw new Exception("RTree Update not implemented.");
+    }
+
+    public final class BulkLoadContext implements IIndexBulkLoadContext {
+
+        public RTreeOpContext insertOpCtx;
+
+        public BulkLoadContext(float fillFactor, IRTreeFrame leafFrame, IRTreeFrame interiorFrame,
+                ITreeIndexMetaDataFrame metaFrame) throws HyracksDataException {
+
+            insertOpCtx = createOpContext(IndexOp.INSERT, leafFrame, interiorFrame, metaFrame);
+        }
+    }
+
+    @Override
+    public IIndexBulkLoadContext beginBulkLoad(float fillFactor, ITreeIndexFrame leafFrame,
+            ITreeIndexFrame interiorFrame, ITreeIndexMetaDataFrame metaFrame) throws HyracksDataException {
+        if (loaded)
+            throw new HyracksDataException("Trying to bulk-load RTree but has RTree already been loaded.");
+
+        BulkLoadContext ctx = new BulkLoadContext(fillFactor, (IRTreeFrame) leafFrame, (IRTreeFrame) interiorFrame,
+                metaFrame);
+        return ctx;
+    }
+
+    @Override
+    public void bulkLoadAddTuple(IIndexBulkLoadContext ictx, ITupleReference tuple) throws HyracksDataException {
+        try {
+            insert(tuple, ((BulkLoadContext) ictx).insertOpCtx);
+        } catch (Exception e) {
+            throw new HyracksDataException("BulkLoad Error");
+        }
+    }
+
+    @Override
+    public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
+        loaded = true;
+    }
+
+    @Override
+    public void diskOrderScan(ITreeIndexCursor icursor, ITreeIndexFrame leafFrame, ITreeIndexMetaDataFrame metaFrame,
+            IndexOpContext ictx) throws HyracksDataException {
+        TreeDiskOrderScanCursor cursor = (TreeDiskOrderScanCursor) icursor;
+        RTreeOpContext ctx = (RTreeOpContext) ictx;
+        ctx.reset();
+
+        int currentPageId = rootPage + 1;
+        int maxPageId = freePageManager.getMaxPage(metaFrame);
+
+        ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, currentPageId), false);
+        page.acquireReadLatch();
+        cursor.setBufferCache(bufferCache);
+        cursor.setFileId(fileId);
+        cursor.setCurrentPageId(currentPageId);
+        cursor.setMaxPageId(maxPageId);
+        ctx.cursorInitialState.setPage(page);
+        cursor.open(ctx.cursorInitialState, diskOrderScanPredicate);
+    }
+
+    @Override
+    public int getRootPageId() {
+        return rootPage;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return cmp.getFieldCount();
+    }
+
+    @Override
+    public IndexType getIndexType() {
+        return IndexType.RTREE;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeCursorInitialState.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeCursorInitialState.java
new file mode 100644
index 0000000..8b94a04
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeCursorInitialState.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public class RTreeCursorInitialState implements ICursorInitialState {
+
+    private PathList pathList;
+    private int rootPage;
+    private ICachedPage page; // for disk order scan
+
+    public RTreeCursorInitialState(PathList pathList, int rootPage) {
+        this.pathList = pathList;
+        this.rootPage = rootPage;
+    }
+
+    public PathList getPathList() {
+        return pathList;
+    }
+
+    public int getRootPage() {
+        return rootPage;
+    }
+
+    public void setRootPage(int rootPage) {
+        this.rootPage = rootPage;
+    }
+
+    public ICachedPage getPage() {
+        return page;
+    }
+
+    public void setPage(ICachedPage page) {
+        this.page = page;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
new file mode 100644
index 0000000..ea8af28
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOpContext;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+
+public final class RTreeOpContext implements IndexOpContext {
+    public final IndexOp op;
+    public final IRTreeInteriorFrame interiorFrame;
+    public final IRTreeLeafFrame leafFrame;
+    public ITreeIndexCursor cursor;
+    public RTreeCursorInitialState cursorInitialState;
+    public final ITreeIndexMetaDataFrame metaFrame;
+    public final RTreeSplitKey splitKey;
+    public ITupleReference tuple;
+    public final PathList pathList; // used to record the pageIds and pageLsns
+                                    // of the visited pages
+    public final PathList traverseList; // used for traversing the tree
+    private static final int initTraverseListSize = 100;
+
+    public RTreeOpContext(IndexOp op, IRTreeLeafFrame leafFrame, IRTreeInteriorFrame interiorFrame,
+            ITreeIndexMetaDataFrame metaFrame, int treeHeightHint) {
+        this.op = op;
+        this.interiorFrame = interiorFrame;
+        this.leafFrame = leafFrame;
+        this.metaFrame = metaFrame;
+        pathList = new PathList(treeHeightHint, treeHeightHint);
+        if (op != IndexOp.SEARCH && op != IndexOp.DISKORDERSCAN) {
+            splitKey = new RTreeSplitKey(interiorFrame.getTupleWriter().createTupleReference(), interiorFrame
+                    .getTupleWriter().createTupleReference());
+            traverseList = new PathList(initTraverseListSize, initTraverseListSize);
+        } else {
+            splitKey = null;
+            traverseList = null;
+            cursorInitialState = new RTreeCursorInitialState(pathList, 1);
+        }
+    }
+
+    public ITupleReference getTuple() {
+        return tuple;
+    }
+
+    public void setTuple(ITupleReference tuple) {
+        this.tuple = tuple;
+    }
+
+    public void reset() {
+        if (pathList != null) {
+            pathList.clear();
+        }
+        if (traverseList != null) {
+            traverseList.clear();
+        }
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
new file mode 100644
index 0000000..86a7bfd
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+
+public class RTreeSearchCursor implements ITreeIndexCursor {
+
+    private int fileId = -1;
+    private ICachedPage page = null;
+    private IRTreeInteriorFrame interiorFrame = null;
+    private IRTreeLeafFrame leafFrame = null;
+    private IBufferCache bufferCache = null;
+
+    private SearchPredicate pred;
+    private PathList pathList;
+    private int rootPage;
+    ITupleReference searchKey;
+
+    private int tupleIndex = 0;
+    private int tupleIndexInc = 0;
+
+    private MultiComparator cmp;
+
+    private ITreeIndexTupleReference frameTuple;
+    private boolean readLatched = false;
+
+    private int pin = 0;
+    private int unpin = 0;
+
+    public RTreeSearchCursor(IRTreeInteriorFrame interiorFrame, IRTreeLeafFrame leafFrame) {
+        this.interiorFrame = interiorFrame;
+        this.leafFrame = leafFrame;
+        this.frameTuple = leafFrame.createTupleReference();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (readLatched) {
+            page.releaseReadLatch();
+            bufferCache.unpin(page);
+            readLatched = false;
+        }
+        tupleIndex = 0;
+        tupleIndexInc = 0;
+        page = null;
+        pathList = null;
+    }
+
+    public ITupleReference getTuple() {
+        return frameTuple;
+    }
+
+    @Override
+    public ICachedPage getPage() {
+        return page;
+    }
+
+    public boolean fetchNextLeafPage() throws HyracksDataException {
+        if (readLatched) {
+            page.releaseReadLatch();
+            bufferCache.unpin(page);
+            unpin++;
+            readLatched = false;
+        }
+        while (!pathList.isEmpty()) {
+            int pageId = pathList.getLastPageId();
+            int parentLsn = pathList.getLastPageLsn();
+            pathList.moveLast();
+            ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+            pin++;
+            node.acquireReadLatch();
+            readLatched = true;
+            interiorFrame.setPage(node);
+            boolean isLeaf = interiorFrame.isLeaf();
+            int pageLsn = interiorFrame.getPageLsn();
+
+            if (pageId != rootPage && parentLsn < interiorFrame.getPageNsn()) {
+                // Concurrent split detected, we need to visit the right page
+                int rightPage = interiorFrame.getRightPage();
+                if (rightPage != -1) {
+                    pathList.add(rightPage, parentLsn, -1);
+                }
+            }
+
+            if (!isLeaf) {
+                for (int i = 0; i < interiorFrame.getTupleCount(); i++) {
+                    int childPageId = interiorFrame.getChildPageIdIfIntersect(searchKey, i, cmp);
+                    if (childPageId != -1) {
+                        pathList.add(childPageId, pageLsn, -1);
+                    }
+                }
+            } else {
+                page = node;
+                leafFrame.setPage(page);
+                tupleIndex = 0;
+                return true;
+            }
+            node.releaseReadLatch();
+            readLatched = false;
+            bufferCache.unpin(node);
+            unpin++;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        if (page == null) {
+            return false;
+        }
+
+        if (tupleIndex == leafFrame.getTupleCount()) {
+            if (!fetchNextLeafPage()) {
+                return false;
+            }
+        }
+
+        do {
+            for (int i = tupleIndex; i < leafFrame.getTupleCount(); i++) {
+                if (leafFrame.intersect(searchKey, i, cmp)) {
+                    frameTuple.resetByTupleIndex(leafFrame, i);
+                    tupleIndexInc = i + 1;
+                    return true;
+                }
+            }
+        } while (fetchNextLeafPage());
+        return false;
+    }
+
+    @Override
+    public void next() throws Exception {
+        tupleIndex = tupleIndexInc;
+    }
+
+    @Override
+    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws Exception {
+        // in case open is called multiple times without closing
+        if (this.page != null) {
+            this.page.releaseReadLatch();
+            readLatched = false;
+            bufferCache.unpin(this.page);
+            pathList.clear();
+        }
+
+        pathList = ((RTreeCursorInitialState) initialState).getPathList();
+        rootPage = ((RTreeCursorInitialState) initialState).getRootPage();
+
+        pred = (SearchPredicate) searchPred;
+        cmp = pred.getLowKeyComparator();
+        searchKey = pred.getSearchKey();
+
+        pathList.add(this.rootPage, -1, -1);
+        frameTuple.setFieldCount(cmp.getFieldCount());
+        tupleIndex = 0;
+        fetchNextLeafPage();
+    }
+
+    @Override
+    public void reset() {
+        try {
+            close();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void setBufferCache(IBufferCache bufferCache) {
+        this.bufferCache = bufferCache;
+    }
+
+    @Override
+    public void setFileId(int fileId) {
+        this.fileId = fileId;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSplitKey.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSplitKey.java
new file mode 100644
index 0000000..f220ea3
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeSplitKey.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+
+public class RTreeSplitKey implements ISplitKey {
+    public byte[] leftPageData = null;
+    public ByteBuffer leftPageBuf = null;
+    public ITreeIndexTupleReference leftTuple;
+
+    public byte[] rightPageData = null;
+    public ByteBuffer rightPageBuf = null;
+    public ITreeIndexTupleReference rightTuple;
+
+    public int keySize = 0;
+
+    public RTreeSplitKey(ITreeIndexTupleReference leftTuple, ITreeIndexTupleReference rightTuple) {
+        this.leftTuple = leftTuple;
+        this.rightTuple = rightTuple;
+    }
+
+    public void initData(int keySize) {
+        // try to reuse existing memory from a lower-level split if possible
+        this.keySize = keySize;
+        if (leftPageData != null) {
+            if (leftPageData.length < keySize + 4) {
+                leftPageData = new byte[keySize + 4]; // add 4 for the page
+                leftPageBuf = ByteBuffer.wrap(leftPageData);
+            }
+        } else {
+            leftPageData = new byte[keySize + 4]; // add 4 for the page
+            leftPageBuf = ByteBuffer.wrap(leftPageData);
+        }
+        if (rightPageData != null) {
+            if (rightPageData.length < keySize + 4) {
+                rightPageData = new byte[keySize + 4]; // add 4 for the page
+                rightPageBuf = ByteBuffer.wrap(rightPageData);
+            }
+        } else {
+            rightPageData = new byte[keySize + 4]; // add 4 for the page
+            rightPageBuf = ByteBuffer.wrap(rightPageData);
+        }
+
+        leftTuple.resetByTupleOffset(leftPageBuf, 0);
+        rightTuple.resetByTupleOffset(rightPageBuf, 0);
+    }
+
+    public void resetLeftPage() {
+        leftPageData = null;
+        leftPageBuf = null;
+    }
+
+    public void resetRightPage() {
+        rightPageData = null;
+        rightPageBuf = null;
+    }
+
+    public ByteBuffer getLeftPageBuffer() {
+        return leftPageBuf;
+    }
+
+    public ByteBuffer getRightPageBuffer() {
+        return rightPageBuf;
+    }
+
+    public ITreeIndexTupleReference getLeftTuple() {
+        return leftTuple;
+    }
+
+    public ITreeIndexTupleReference getRightTuple() {
+        return rightTuple;
+    }
+
+    public int getLeftPage() {
+        return leftPageBuf.getInt(keySize);
+    }
+
+    public int getRightPage() {
+        return rightPageBuf.getInt(keySize);
+    }
+
+    public void setLeftPage(int page) {
+        leftPageBuf.putInt(keySize, page);
+    }
+
+    public void setRightPage(int page) {
+        rightPageBuf.putInt(keySize, page);
+    }
+
+    public ISplitKey duplicate(ITreeIndexTupleReference copyLeftTuple, ITreeIndexTupleReference copyRightTuple) {
+        RTreeSplitKey copy = new RTreeSplitKey(copyLeftTuple, copyRightTuple);
+        copy.leftPageData = leftPageData.clone();
+        copy.leftPageBuf = ByteBuffer.wrap(copy.leftPageData);
+        copy.leftTuple.setFieldCount(leftTuple.getFieldCount());
+        copy.leftTuple.resetByTupleOffset(copy.leftPageBuf, 0);
+
+        copy.rightPageData = rightPageData.clone();
+        copy.rightPageBuf = ByteBuffer.wrap(copy.rightPageData);
+        copy.rightTuple.setFieldCount(rightTuple.getFieldCount());
+        copy.rightTuple.resetByTupleOffset(copy.rightPageBuf, 0);
+        return copy;
+    }
+
+    @Override
+    public void reset() {
+        leftPageData = null;
+        leftPageBuf = null;
+        rightPageData = null;
+        rightPageBuf = null;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ITreeIndexTupleReference getTuple() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setPages(int leftPage, int rightPage) {
+        leftPageBuf.putInt(keySize, leftPage);
+        rightPageBuf.putInt(keySize, rightPage);
+    }
+
+    @Override
+    public ISplitKey duplicate(ITreeIndexTupleReference copyTuple) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java
new file mode 100644
index 0000000..1fb86a9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+
+public class Rectangle {
+    private int dim;
+    private double[] low;
+    private double[] high;
+
+    public Rectangle(int dim) {
+        this.dim = dim;
+        low = new double[this.dim];
+        high = new double[this.dim];
+    }
+
+    public int getDim() {
+        return dim;
+    }
+
+    public double getLow(int i) {
+        return low[i];
+    }
+
+    public double getHigh(int i) {
+        return high[i];
+    }
+
+    public void setLow(int i, double value) {
+        low[i] = value;
+    }
+
+    public void setHigh(int i, double value) {
+        high[i] = value;
+    }
+
+    public void set(ITupleReference tuple) {
+        for (int i = 0; i < getDim(); i++) {
+            int j = i + getDim();
+            setLow(i, DoubleSerializerDeserializer.getDouble(tuple.getFieldData(i), tuple.getFieldStart(i)));
+            setHigh(i, DoubleSerializerDeserializer.getDouble(tuple.getFieldData(j), tuple.getFieldStart(j)));
+        }
+    }
+
+    public void enlarge(ITupleReference tupleToBeInserted) {
+        for (int i = 0; i < getDim(); i++) {
+            int j = getDim() + i;
+            double low = DoubleSerializerDeserializer.getDouble(tupleToBeInserted.getFieldData(i),
+                    tupleToBeInserted.getFieldStart(i));
+            if (getLow(i) > low) {
+                setLow(i, low);
+            }
+            double high = DoubleSerializerDeserializer.getDouble(tupleToBeInserted.getFieldData(j),
+                    tupleToBeInserted.getFieldStart(j));
+            if (getHigh(i) < high) {
+                setHigh(i, high);
+            }
+        }
+    }
+
+    public double margin() {
+        double margin = 0.0;
+        double mul = Math.pow(2, (double) getDim() - 1.0);
+        for (int i = 0; i < getDim(); i++) {
+            margin += (getHigh(i) - getLow(i)) * mul;
+        }
+        return margin;
+    }
+
+    public double overlappedArea(ITupleReference tuple) {
+        double area = 1.0;
+        double f1, f2;
+
+        for (int i = 0; i < getDim(); i++) {
+            int j = getDim() + i;
+            double low = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(i), tuple.getFieldStart(i));
+            double high = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(j), tuple.getFieldStart(j));
+            if (getLow(i) > high || getHigh(i) < low) {
+                return 0.0;
+            }
+            f1 = Math.max(getLow(i), low);
+            f2 = Math.min(getHigh(i), high);
+            area *= f2 - f1;
+        }
+        return area;
+    }
+
+    public double overlappedArea(Rectangle rec) {
+        double area = 1.0;
+        double f1, f2;
+
+        for (int i = 0; i < getDim(); i++) {
+            if (getLow(i) > rec.getHigh(i) || getHigh(i) < rec.getLow(i)) {
+                return 0.0;
+            }
+
+            f1 = Math.max(getLow(i), rec.getLow(i));
+            f2 = Math.min(getHigh(i), rec.getHigh(i));
+            area *= f2 - f1;
+        }
+        return area;
+    }
+
+    public double area(ITupleReference tuple) {
+        double area = 1.0;
+        for (int i = 0; i < getDim(); i++) {
+            int j = getDim() + i;
+            area *= DoubleSerializerDeserializer.getDouble(tuple.getFieldData(j), tuple.getFieldStart(j))
+                    - DoubleSerializerDeserializer.getDouble(tuple.getFieldData(i), tuple.getFieldStart(i));
+        }
+        return area;
+    }
+
+    public double area() {
+        double area = 1.0;
+        for (int i = 0; i < getDim(); i++) {
+            area *= getHigh(i) - getLow(i);
+        }
+        return area;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/SearchPredicate.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/SearchPredicate.java
new file mode 100644
index 0000000..cd3a0ef
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/SearchPredicate.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public class SearchPredicate implements ISearchPredicate {
+
+    private static final long serialVersionUID = 1L;
+
+    protected ITupleReference searchKey;
+    protected MultiComparator cmp;
+
+    public SearchPredicate(ITupleReference searchKey, MultiComparator cmp) {
+        this.searchKey = searchKey;
+        this.cmp = cmp;
+    }
+
+    public ITupleReference getSearchKey() {
+        return searchKey;
+    }
+
+    public void setSearchKey(ITupleReference searchKey) {
+        this.searchKey = searchKey;
+    }
+
+    public MultiComparator getLowKeyComparator() {
+        return cmp;
+    }
+
+    public MultiComparator getHighKeyComparator() {
+        return cmp;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntry.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntry.java
new file mode 100644
index 0000000..0cb3de8
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntry.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMFrame;
+
+public class TupleEntry implements Comparable<TupleEntry> {
+    private int tupleIndex;
+    private double value;
+
+    public TupleEntry() {
+    }
+
+    public int getTupleIndex() {
+        return tupleIndex;
+    }
+
+    public void setTupleIndex(int tupleIndex) {
+        this.tupleIndex = tupleIndex;
+    }
+
+    public double getValue() {
+        return value;
+    }
+
+    public void setValue(double value) {
+        this.value = value;
+    }
+
+    public int compareTo(TupleEntry tupleEntry) {
+        double cmp = this.getValue() - tupleEntry.getValue();
+        if (cmp > RTreeNSMFrame.doubleEpsilon())
+            return 1;
+        cmp = tupleEntry.getValue() - this.getValue();
+        if (cmp > RTreeNSMFrame.doubleEpsilon())
+            return -1;
+        return 0;
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntryArrayList.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntryArrayList.java
new file mode 100644
index 0000000..8be8251
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntryArrayList.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+public class TupleEntryArrayList {
+    private TupleEntry[] data;
+    private int size;
+    private final int growth;
+
+    public TupleEntryArrayList(int initialCapacity, int growth) {
+        data = new TupleEntry[initialCapacity];
+        size = 0;
+        this.growth = growth;
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public void add(int tupleIndex, double value) {
+        if (size == data.length) {
+            TupleEntry[] newData = new TupleEntry[data.length + growth];
+            System.arraycopy(data, 0, newData, 0, data.length);
+            data = newData;
+        }
+        if (data[size] == null) {
+            data[size] = new TupleEntry();
+        }
+        data[size].setTupleIndex(tupleIndex);
+        data[size].setValue(value);
+        size++;
+    }
+
+    public void removeLast() {
+        if (size > 0)
+            size--;
+    }
+
+    // WARNING: caller is responsible for checking size > 0
+    public TupleEntry getLast() {
+        return data[size - 1];
+    }
+
+    public TupleEntry get(int i) {
+        return data[i];
+    }
+
+    public void clear() {
+        size = 0;
+    }
+
+    public boolean isEmpty() {
+        return size == 0;
+    }
+
+    public void sort(EntriesOrder order, int tupleCount) {
+        if (order == EntriesOrder.ASCENDING) {
+            Arrays.sort(data, 0, tupleCount);
+        } else {
+            Arrays.sort(data, 0, tupleCount, Collections.reverseOrder());
+        }
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
new file mode 100644
index 0000000..d2b1c53
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.frames.AbstractSlotManager;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMFrame;
+
+public class UnorderedSlotManager extends AbstractSlotManager {
+    @Override
+    public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
+            FindTupleMode mode, FindTupleNoExactMatchPolicy matchPolicy) {
+
+        int maxFieldPos = multiCmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < frame.getTupleCount(); i++) {
+            frameTuple.resetByTupleIndex(frame, i);
+
+            boolean foundTuple = true;
+            for (int j = 0; j < maxFieldPos; j++) {
+                int k = maxFieldPos + j;
+                int c1 = multiCmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                        frameTuple.getFieldLength(j), searchKey.getFieldData(j), searchKey.getFieldStart(j),
+                        searchKey.getFieldLength(j));
+
+                if (c1 != 0) {
+                    foundTuple = false;
+                    break;
+                }
+                int c2 = multiCmp.getComparators()[k].compare(frameTuple.getFieldData(k), frameTuple.getFieldStart(k),
+                        frameTuple.getFieldLength(k), searchKey.getFieldData(k), searchKey.getFieldStart(k),
+                        searchKey.getFieldLength(k));
+                if (c2 != 0) {
+                    foundTuple = false;
+                    break;
+                }
+            }
+            int remainingFieldCount = multiCmp.getFieldCount() - multiCmp.getKeyFieldCount();
+            for (int j = multiCmp.getKeyFieldCount(); j < multiCmp.getKeyFieldCount() + remainingFieldCount; j++) {
+                if (!compareField(searchKey, frameTuple, j)) {
+                    foundTuple = false;
+                    break;
+                }
+            }
+            if (foundTuple) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    public boolean compareField(ITupleReference searchKey, ITreeIndexTupleReference frameTuple, int fIdx) {
+        int searchKeyFieldLength = searchKey.getFieldLength(fIdx);
+        int frameTupleFieldLength = frameTuple.getFieldLength(fIdx);
+
+        if (searchKeyFieldLength != frameTupleFieldLength) {
+            return false;
+        }
+
+        for (int i = 0; i < searchKeyFieldLength; i++) {
+            if (searchKey.getFieldData(fIdx)[i + searchKey.getFieldStart(fIdx)] != frameTuple.getFieldData(fIdx)[i
+                    + frameTuple.getFieldStart(fIdx)]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public int insertSlot(int tupleIndex, int tupleOff) {
+        int slotOff = getSlotEndOff() - slotSize;
+        setSlot(slotOff, tupleOff);
+        return slotOff;
+    }
+
+    public void modifySlot(int slotOff, int tupleOff) {
+        setSlot(slotOff, tupleOff);
+    }
+
+    public void deleteEmptySlots() {
+        int slotOff = getSlotStartOff();
+        while (slotOff >= getSlotEndOff()) {
+            if (frame.getBuffer().getInt(slotOff) == -1) {
+                while (frame.getBuffer().getInt(getSlotEndOff()) == -1) {
+                    ((RTreeNSMFrame) frame).setTupleCount(frame.getTupleCount() - 1);
+                }
+                if (slotOff > getSlotEndOff()) {
+                    System.arraycopy(frame.getBuffer().array(), getSlotEndOff(), frame.getBuffer().array(), slotOff,
+                            slotSize);
+                    ((RTreeNSMFrame) frame).setTupleCount(frame.getTupleCount() - 1);
+                } else {
+                    break;
+                }
+            }
+            slotOff -= slotSize;
+        }
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriter.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriter.java
new file mode 100644
index 0000000..96820c9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.tuples;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriter;
+
+public class RTreeTypeAwareTupleWriter extends TypeAwareTupleWriter {
+
+    public RTreeTypeAwareTupleWriter(ITypeTrait[] typeTraits) {
+        super(typeTraits);
+    }
+
+    public int writeTupleFields(ITreeIndexTupleReference[] refs, int startField, ByteBuffer targetBuf, int targetOff) {
+        int runner = targetOff;
+        int nullFlagsBytes = getNullFlagsBytes(refs.length);
+        // write null indicator bits
+        for (int i = 0; i < nullFlagsBytes; i++) {
+            targetBuf.put(runner++, (byte) 0);
+        }
+
+        // write field slots for variable length fields
+        // since the r-tree has fixed length keys, we don't actually need this?
+        encDec.reset(targetBuf.array(), runner);
+        for (int i = startField; i < startField + refs.length; i++) {
+            if (typeTraits[i].getStaticallyKnownDataLength() == ITypeTrait.VARIABLE_LENGTH) {
+                encDec.encode(refs[i].getFieldLength(i));
+            }
+        }
+        runner = encDec.getPos();
+
+        // write data
+        for (int i = 0; i < refs.length; i++) {
+            System.arraycopy(refs[i].getFieldData(i), refs[i].getFieldStart(i), targetBuf.array(), runner,
+                    refs[i].getFieldLength(i));
+            runner += refs[i].getFieldLength(i);
+        }
+        return runner - targetOff;
+
+    }
+}
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriterFactory.java b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriterFactory.java
new file mode 100644
index 0000000..e2e99c7
--- /dev/null
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriterFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.tuples;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriterFactory;
+
+public class RTreeTypeAwareTupleWriterFactory implements ITreeIndexTupleWriterFactory {
+
+    private static final long serialVersionUID = 1L;
+    private ITypeTrait[] typeTraits;
+
+    public RTreeTypeAwareTupleWriterFactory(ITypeTrait[] typeTraits) {
+        this.typeTraits = typeTraits;
+    }
+
+    @Override
+    public ITreeIndexTupleWriter createTupleWriter() {
+        return new RTreeTypeAwareTupleWriter(typeTraits);
+    }
+
+}