Added BTree Access Method to Hyracks

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@86 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/.classpath b/hyracks-storage-am-btree/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks-storage-am-btree/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-storage-am-btree/.project b/hyracks-storage-am-btree/.project
new file mode 100644
index 0000000..754745f
--- /dev/null
+++ b/hyracks-storage-am-btree/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>hyracks-storage-am-btree</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.maven.ide.eclipse.maven2Builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+	</natures>
+</projectDescription>
diff --git a/hyracks-storage-am-btree/.settings/org.eclipse.jdt.core.prefs b/hyracks-storage-am-btree/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..6c283ad
--- /dev/null
+++ b/hyracks-storage-am-btree/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,6 @@
+#Sun Aug 29 21:59:34 PDT 2010
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
+org.eclipse.jdt.core.compiler.compliance=1.6
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks-storage-am-btree/.settings/org.maven.ide.eclipse.prefs b/hyracks-storage-am-btree/.settings/org.maven.ide.eclipse.prefs
new file mode 100644
index 0000000..ecf0da7
--- /dev/null
+++ b/hyracks-storage-am-btree/.settings/org.maven.ide.eclipse.prefs
@@ -0,0 +1,9 @@
+#Sun Aug 29 21:59:34 PDT 2010
+activeProfiles=
+eclipse.preferences.version=1
+fullBuildGoals=process-test-resources
+includeModules=false
+resolveWorkspaceProjects=true
+resourceFilterGoals=process-resources resources\:testResources
+skipCompilerPlugin=true
+version=1
diff --git a/hyracks-storage-am-btree/pom.xml b/hyracks-storage-am-btree/pom.xml
new file mode 100644
index 0000000..d8b0abf
--- /dev/null
+++ b/hyracks-storage-am-btree/pom.xml
@@ -0,0 +1,35 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks</groupId>
+  <artifactId>hyracks-storage-am-btree</artifactId>
+  <version>0.1.2-SNAPSHOT</version>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.1.2-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-storage-common</artifactId>
+  		<version>0.1.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+</project>
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/compressors/FieldPrefixCompressor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/compressors/FieldPrefixCompressor.java
new file mode 100644
index 0000000..e383eec
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/compressors/FieldPrefixCompressor.java
@@ -0,0 +1,434 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.compressors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.frames.FieldPrefixNSMLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.impls.FieldIterator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.FieldPrefixSlotManager;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IComparator;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IFieldAccessor;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IFrameCompressor;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IPrefixSlotManager;
+
+public class FieldPrefixCompressor implements IFrameCompressor {
+	
+	public void compress(FieldPrefixNSMLeaf frame, MultiComparator cmp) throws HyracksDataException {
+    	int numRecords = frame.getNumRecords();       
+        //System.out.println("NUMRECORDS: " + numRecords);
+    	if(numRecords <= 0) {
+            frame.setNumPrefixRecords(0);
+            frame.setFreeSpaceOff(frame.getOrigFreeSpaceOff());
+            frame.setTotalFreeSpace(frame.getOrigTotalFreeSpace());
+            return;
+        }
+        
+        IComparator[] cmps = cmp.getComparators();
+        IFieldAccessor[] fields = cmp.getFields();
+        
+        ByteBuffer buf = frame.getBuffer();
+        byte[] pageArray = buf.array();
+        IPrefixSlotManager slotManager = frame.slotManager;
+        
+        int occurrenceThreshold = 2;
+        
+        // perform analysis pass
+        ArrayList<KeyPartition> keyPartitions = getKeyPartitions(frame, cmp, occurrenceThreshold);
+        if(keyPartitions.size() == 0) return;
+        
+        //System.out.println("KEYPARTITIONS: " + keyPartitions.size());
+        
+        // for each keyPartition, determine the best prefix length for compression, and count how many prefix records we would need in total
+        int totalSlotsNeeded = 0;
+        int totalPrefixBytes = 0;
+        for(KeyPartition kp : keyPartitions) {                                	
+        	for(int j = 0; j < kp.pmi.length; j++) {
+                int benefitMinusCost = kp.pmi[j].spaceBenefit - kp.pmi[j].spaceCost;            
+                if(benefitMinusCost > kp.maxBenefitMinusCost) {
+                    kp.maxBenefitMinusCost = benefitMinusCost;
+                    kp.maxPmiIndex = j;
+                }
+            }
+        	        	
+            if(kp.maxBenefitMinusCost <= 0) continue; // ignore keyPartitions with no benefit
+            
+            totalPrefixBytes += kp.pmi[kp.maxPmiIndex].prefixBytes;
+            totalSlotsNeeded += kp.pmi[kp.maxPmiIndex].prefixSlotsNeeded;
+        }
+        //System.out.println("TOTAL SLOTS NEEDED: " + totalSlotsNeeded);
+        
+        // we use a greedy heuristic to solve this "knapsack"-like problem
+        // (every keyPartition has a space savings and a number of slots required, but we the number of slots are constrained by MAX_PREFIX_SLOTS)
+        // we sort the keyPartitions by maxBenefitMinusCost / prefixSlotsNeeded and later choose the top MAX_PREFIX_SLOTS        
+        int[] newPrefixSlots;
+        if(totalSlotsNeeded > FieldPrefixSlotManager.MAX_PREFIX_SLOTS) {        	        	
+        	// order keyPartitions by the heuristic function
+            SortByHeuristic heuristicComparator = new SortByHeuristic();
+            Collections.sort(keyPartitions, heuristicComparator);
+            int slotsUsed = 0;
+            int numberKeyPartitions = -1;
+            for(int i = 0; i < keyPartitions.size(); i++) {
+                KeyPartition kp = keyPartitions.get(i); 
+                slotsUsed += kp.pmi[kp.maxPmiIndex].prefixSlotsNeeded;
+                if(slotsUsed > FieldPrefixSlotManager.MAX_PREFIX_SLOTS) {
+                    numberKeyPartitions = i + 1;
+                    slotsUsed -= kp.pmi[kp.maxPmiIndex].prefixSlotsNeeded;
+                    break;
+                }                
+            }
+            newPrefixSlots = new int[slotsUsed];
+            
+            // remove irrelevant keyPartitions and adjust total prefix bytes
+            while(keyPartitions.size() >= numberKeyPartitions) {            	            	
+            	int lastIndex = keyPartitions.size() - 1;
+            	KeyPartition kp = keyPartitions.get(lastIndex);
+            	if(kp.maxBenefitMinusCost > 0) totalPrefixBytes -= kp.pmi[kp.maxPmiIndex].prefixBytes;            	
+            	keyPartitions.remove(lastIndex);
+            }
+            
+            // re-order keyPartitions by prefix (corresponding to original order)
+            SortByOriginalRank originalRankComparator = new SortByOriginalRank();
+            Collections.sort(keyPartitions, originalRankComparator);                        
+        }
+        else {
+            newPrefixSlots = new int[totalSlotsNeeded];
+        }
+        
+        //System.out.println("TOTALPREFIXBYTES: " + totalPrefixBytes);
+        
+        int[] newRecordSlots = new int[numRecords];
+        
+        // WARNING: our hope is that compression is infrequent
+        // here we allocate a big chunk of memory to temporary hold the new, re-compressed records
+        // in general it is very hard to avoid this step        
+        int prefixFreeSpace = frame.getOrigFreeSpaceOff();
+        int recordFreeSpace = prefixFreeSpace + totalPrefixBytes;
+        byte[] buffer = new byte[buf.capacity()];        
+        
+        // perform compression, and reorg
+        // we assume that the keyPartitions are sorted by the prefixes (i.e., in the logical target order)
+        int kpIndex = 0;
+        int recSlotNum = 0;
+        int prefixSlotNum = 0;
+        FieldIterator recToWrite = new FieldIterator(fields, frame);
+        while(recSlotNum < numRecords) {           
+            if(kpIndex < keyPartitions.size()) {
+            	
+            	// beginning of keyPartition found, compress entire keyPartition
+            	if(recSlotNum == keyPartitions.get(kpIndex).firstRecSlotNum) {            		
+            		
+            		// number of fields we decided to use for compression of this keyPartition
+            		int numFieldsToCompress = keyPartitions.get(kpIndex).maxPmiIndex + 1;            		
+            		int segmentStart = keyPartitions.get(kpIndex).firstRecSlotNum;
+            		int recordsInSegment = 1;
+            		
+            		//System.out.println("PROCESSING KEYPARTITION: " + kpIndex + " RANGE: " + keyPartitions.get(kpIndex).firstRecSlotNum + " " + keyPartitions.get(kpIndex).lastRecSlotNum + " FIELDSTOCOMPRESS: " + numFieldsToCompress);
+            		
+            		FieldIterator prevRec = new FieldIterator(fields, frame);        
+                    FieldIterator rec = new FieldIterator(fields, frame);
+                    
+                    for(int i = recSlotNum + 1; i <= keyPartitions.get(kpIndex).lastRecSlotNum; i++) {
+                    	prevRec.openRecSlotNum(i - 1);
+                        rec.openRecSlotNum(i);
+                        
+                        // check if records match in numFieldsToCompress of their first fields
+                        int prefixFieldsMatch = 0;
+                        for(int j = 0; j < numFieldsToCompress; j++) {                                                    
+                            if(cmps[j].compare(pageArray, prevRec.getFieldOff(), pageArray, rec.getFieldOff()) == 0) prefixFieldsMatch++;
+                            else break;
+                            prevRec.nextField();
+                            rec.nextField();
+                        }
+                                                                  	                       
+                        // the two records must match in exactly the number of fields we decided to compress for this keyPartition
+                        int processSegments = 0;
+                        if(prefixFieldsMatch == numFieldsToCompress) recordsInSegment++;                        
+                        else processSegments++;
+
+                        if(i == keyPartitions.get(kpIndex).lastRecSlotNum) processSegments++;
+                        
+                        for(int r = 0; r < processSegments; r++) {
+                        	// compress current segment and then start new segment                       
+                        	if(recordsInSegment < occurrenceThreshold || numFieldsToCompress <= 0) {
+                        		// segment does not have at least occurrenceThreshold records, so write records uncompressed
+                        		for(int j = 0; j < recordsInSegment; j++) {
+                        		    int slotNum = segmentStart + j;
+                        		    recToWrite.openRecSlotNum(slotNum);
+                        		    newRecordSlots[numRecords - 1 - slotNum] = slotManager.encodeSlotFields(FieldPrefixSlotManager.RECORD_UNCOMPRESSED, recordFreeSpace);
+                        		    recordFreeSpace += recToWrite.copyFields(0, fields.length - 1, buffer, recordFreeSpace);
+                        		}
+                        	}
+                        	else {
+                        	    // segment has enough records, compress segment
+                        		// extract prefix, write prefix record to buffer, and set prefix slot
+                        	    newPrefixSlots[newPrefixSlots.length - 1 - prefixSlotNum] = slotManager.encodeSlotFields(numFieldsToCompress, prefixFreeSpace);
+                        	    //int tmp = freeSpace;
+                        	    //prevRec.reset();
+                        	    //System.out.println("SOURCE CONTENTS: " + buf.getInt(prevRec.getFieldOff()) + " " + buf.getInt(prevRec.getFieldOff()+4));
+                        	    prefixFreeSpace += prevRec.copyFields(0, numFieldsToCompress - 1, buffer, prefixFreeSpace);            	                            	                           	    
+                        	    //System.out.println("WRITING PREFIX RECORD " + prefixSlotNum + " AT " + tmp + " " + freeSpace);
+                        	    //System.out.print("CONTENTS: ");
+                        	    //for(int x = 0; x < numFieldsToCompress; x++) System.out.print(buf.getInt(tmp + x*4) + " ");
+                        	    //System.out.println();
+                        	    
+                        		// truncate records, write them to buffer, and set record slots
+                        	    for(int j = 0; j < recordsInSegment; j++) {
+                        	        int slotNum = segmentStart + j;
+                                    recToWrite.openRecSlotNum(slotNum);
+                                    newRecordSlots[numRecords - 1 - slotNum] = slotManager.encodeSlotFields(prefixSlotNum, recordFreeSpace);
+                                    recordFreeSpace += recToWrite.copyFields(numFieldsToCompress, fields.length - 1, buffer, recordFreeSpace);                              
+                                }
+                        	    
+                        	    prefixSlotNum++;
+                        	}
+                        	
+                        	// begin new segment
+                        	segmentStart = i;
+                        	recordsInSegment = 1;               	
+                        }                   
+                    }
+                    
+                    recSlotNum = keyPartitions.get(kpIndex).lastRecSlotNum;
+                	kpIndex++; 	
+                }
+            	else {
+                    // just write the record uncompressed
+            	    recToWrite.openRecSlotNum(recSlotNum);
+            	    newRecordSlots[numRecords - 1 - recSlotNum] = slotManager.encodeSlotFields(FieldPrefixSlotManager.RECORD_UNCOMPRESSED, recordFreeSpace);
+            	    recordFreeSpace += recToWrite.copyFields(0, fields.length - 1, buffer, recordFreeSpace);
+            	}
+            }
+            else {
+                // just write the record uncompressed
+                recToWrite.openRecSlotNum(recSlotNum);
+                newRecordSlots[numRecords - 1 - recSlotNum] = slotManager.encodeSlotFields(FieldPrefixSlotManager.RECORD_UNCOMPRESSED, recordFreeSpace);
+                recordFreeSpace += recToWrite.copyFields(0, fields.length - 1, buffer, recordFreeSpace);
+            }   
+            recSlotNum++;
+        }            
+        
+        // sanity check to see if we have written exactly as many prefix bytes as computed before
+        if(prefixFreeSpace != frame.getOrigFreeSpaceOff() + totalPrefixBytes) {
+        	throw new HyracksDataException("ERROR: Number of prefix bytes written don't match computed number");
+        }
+                
+        // in some rare instances our procedure could even increase the space requirement which is very dangerous
+        // this can happen to to the greedy solution of the knapsack-like problem
+        // therefore, we check if the new space exceeds the page size to avoid the only danger of an increasing space
+        int totalSpace = recordFreeSpace + newRecordSlots.length * slotManager.getSlotSize() + newPrefixSlots.length * slotManager.getSlotSize();
+        if(totalSpace > buf.capacity()) return; // just leave the page as is
+        
+        // copy new records and new slots into original page
+        int freeSpaceAfterInit = frame.getOrigFreeSpaceOff();
+        System.arraycopy(buffer, freeSpaceAfterInit, pageArray, freeSpaceAfterInit, recordFreeSpace - freeSpaceAfterInit);
+        
+        // copy prefix slots
+        int slotOffRunner = buf.capacity() - slotManager.getSlotSize();
+        for(int i = 0; i < newPrefixSlots.length; i++) {
+            buf.putInt(slotOffRunner, newPrefixSlots[newPrefixSlots.length - 1 - i]);
+            slotOffRunner -= slotManager.getSlotSize();
+        }
+        
+        // copy record slots
+        for(int i = 0; i < newRecordSlots.length; i++) {
+            buf.putInt(slotOffRunner, newRecordSlots[newRecordSlots.length - 1 - i]);
+            slotOffRunner -= slotManager.getSlotSize();
+        }
+        
+//        int originalFreeSpaceOff = frame.getOrigFreeSpaceOff();   
+//        System.out.println("ORIGINALFREESPACE: " + originalFreeSpaceOff);
+//        System.out.println("RECSPACE BEF: " + (frame.getFreeSpaceOff() - originalFreeSpaceOff));
+//        System.out.println("RECSPACE AFT: " + (recordFreeSpace - originalFreeSpaceOff));
+//        System.out.println("PREFIXSLOTS BEF: " + frame.getNumPrefixRecords());
+//        System.out.println("PREFIXSLOTS AFT: " + newPrefixSlots.length);
+//        
+//        System.out.println("FREESPACE BEF: " + frame.getFreeSpaceOff());
+//        System.out.println("FREESPACE AFT: " + recordFreeSpace);
+//        System.out.println("PREFIXES: " + newPrefixSlots.length + " / " + FieldPrefixSlotManager.MAX_PREFIX_SLOTS);
+//        System.out.println("RECORDS: " + newRecordSlots.length);
+        
+        // update space fields, TODO: we need to update more fields 
+        frame.setFreeSpaceOff(recordFreeSpace);
+        frame.setNumPrefixRecords(newPrefixSlots.length);
+        int totalFreeSpace = buf.capacity() - recordFreeSpace - ((newRecordSlots.length + newPrefixSlots.length) * slotManager.getSlotSize());
+        frame.setTotalFreeSpace(totalFreeSpace);
+    }
+	
+    // we perform an analysis pass over the records to determine the costs and benefits of different compression options
+    // a "keypartition" is a range of records that has an identical first field
+    // for each keypartition we chose a prefix length to use for compression
+    // i.e., all records in a keypartition will be compressed based on the same prefix length (number of fields)
+    // the prefix length may be different for different keypartitions
+    // the occurrenceThreshold determines the minimum number of records that must share a common prefix in order for us to consider compressing them        
+    private ArrayList<KeyPartition> getKeyPartitions(FieldPrefixNSMLeaf frame, MultiComparator cmp, int occurrenceThreshold) {        
+    	IComparator[] cmps = cmp.getComparators();
+        IFieldAccessor[] fields = cmp.getFields();
+                
+        int maxCmps = cmps.length - 1;
+        ByteBuffer buf = frame.getBuffer();
+        byte[] pageArray = buf.array();
+        IPrefixSlotManager slotManager = frame.slotManager;
+        
+        ArrayList<KeyPartition> keyPartitions = new ArrayList<KeyPartition>();        
+        KeyPartition kp = new KeyPartition(maxCmps);        
+        keyPartitions.add(kp);
+        
+        FieldIterator prevRec = new FieldIterator(fields, frame);        
+        FieldIterator rec = new FieldIterator(fields, frame);
+        
+        kp.firstRecSlotNum = 0;        
+        int numRecords = frame.getNumRecords();
+        for(int i = 1; i < numRecords; i++) {        	        	
+        	prevRec.openRecSlotNum(i-1);
+        	rec.openRecSlotNum(i);
+        	
+            //System.out.println("BEFORE RECORD: " + i + " " + rec.recSlotOff + " " + rec.recOff);
+            //kp.print();
+            
+            int prefixFieldsMatch = 0;            
+            int prefixBytes = 0; // counts the bytes of the common prefix fields
+            
+            for(int j = 0; j < maxCmps; j++) {            	                
+                // debug
+            	if(rec.getFieldOff() > 50000) {
+                	System.out.println("NUMRECORDS: " + numRecords + " " + frame.getNumPrefixRecords());
+                	int recSlotOff = slotManager.getRecSlotOff(i);
+                	int recSlot = buf.getInt(recSlotOff);
+                	int prefixSlotNum = slotManager.decodeFirstSlotField(recSlot);
+                	int recOff = slotManager.decodeSecondSlotField(recSlot);
+                	
+                	System.out.println("HERE: " + recSlotOff + " " + recSlot + " " + prefixSlotNum + " " + recOff);
+                	String keys = frame.printKeys(cmp);
+                	System.out.println(keys);
+                }
+            	            	
+            	if(cmps[j].compare(pageArray, prevRec.getFieldOff(), pageArray, rec.getFieldOff()) == 0) {
+                    prefixFieldsMatch++;
+                    kp.pmi[j].matches++;                     
+                    prefixBytes += rec.getFieldSize();            
+                    
+                    if(kp.pmi[j].matches == occurrenceThreshold) {
+                        // if we compress this prefix, we pay the cost of storing it once, plus the size for one prefix slot
+                        kp.pmi[j].prefixBytes += prefixBytes;
+                        kp.pmi[j].spaceCost += prefixBytes + slotManager.getSlotSize();
+                        kp.pmi[j].prefixSlotsNeeded++;
+                        kp.pmi[j].spaceBenefit += occurrenceThreshold*prefixBytes;                        
+                    }
+                    else if(kp.pmi[j].matches > occurrenceThreshold) {
+                        // we are beyond the occurrence threshold, every additional record with a matching prefix increases the benefit 
+                        kp.pmi[j].spaceBenefit += prefixBytes;
+                    }
+                }
+                else {
+                    kp.pmi[j].matches = 1;     
+                    break;
+                }         
+                
+                prevRec.nextField();
+                rec.nextField();
+            }
+            
+            //System.out.println();
+            //System.out.println("AFTER RECORD: " + i);
+            //kp.print();
+            //System.out.println("-----------------");
+                        
+            // this means not even the first field matched, so we start to consider a new "key partition"
+            if(maxCmps > 0 && prefixFieldsMatch == 0) {                
+            	//System.out.println("NEW KEY PARTITION");            	
+            	kp.lastRecSlotNum = i-1;
+            	
+                // remove keyPartitions that don't have enough records
+                if((kp.lastRecSlotNum - kp.firstRecSlotNum) + 1 < occurrenceThreshold) keyPartitions.remove(keyPartitions.size() - 1);
+                                
+            	kp = new KeyPartition(maxCmps);
+                keyPartitions.add(kp);
+                kp.firstRecSlotNum = i;
+            }         
+        }   
+        kp.lastRecSlotNum = numRecords - 1;
+        // remove keyPartitions that don't have enough records
+        if((kp.lastRecSlotNum - kp.firstRecSlotNum) + 1 < occurrenceThreshold) keyPartitions.remove(keyPartitions.size() - 1);
+        
+        return keyPartitions;
+    }
+    
+        
+    private class PrefixMatchInfo {
+        public int matches = 1;
+        public int spaceCost = 0;      
+        public int spaceBenefit = 0;
+        public int prefixSlotsNeeded = 0;     
+        public int prefixBytes = 0;
+    }
+    
+    private class KeyPartition {        
+        public int firstRecSlotNum;
+        public int lastRecSlotNum;
+        public PrefixMatchInfo[] pmi;
+        
+        public int maxBenefitMinusCost = 0;
+        public int maxPmiIndex = -1;
+        
+        public int totalUncompressedSpace = 0;
+        
+        public KeyPartition(int numKeyFields) {
+            pmi = new PrefixMatchInfo[numKeyFields];
+            for(int i = 0; i < numKeyFields; i++) {
+                pmi[i] = new PrefixMatchInfo();
+            }
+        }
+        
+        void print() {
+            for(int i = 0; i < pmi.length; i++) {
+                System.out.println("PMI: " + i + " | " 
+                        + "MATCHES: " + pmi[i].matches + " | "                         
+                        + "SPACECOST: " + pmi[i].spaceCost + " | "
+                        + "SPACEBENEFIT: " + pmi[i].spaceBenefit + " | "
+                        + "SLOTSNEEDED: " + pmi[i].prefixSlotsNeeded);
+            }
+        }     
+    }    
+    
+    private class SortByHeuristic implements Comparator<KeyPartition>{
+        @Override
+        public int compare(KeyPartition a, KeyPartition b) {
+            if(a.maxPmiIndex < 0) {
+                if(b.maxPmiIndex < 0) return 0;
+                return 1;
+            } else if(b.maxPmiIndex < 0) return -1;
+            
+            // non-negative maxPmiIndex, meaning a non-zero benefit exists
+            float thisHeuristicVal = (float)a.maxBenefitMinusCost / (float)a.pmi[a.maxPmiIndex].prefixSlotsNeeded; 
+            float otherHeuristicVal = (float)b.maxBenefitMinusCost / (float)b.pmi[b.maxPmiIndex].prefixSlotsNeeded;
+            if(thisHeuristicVal < otherHeuristicVal) return 1;
+            else if(thisHeuristicVal > otherHeuristicVal) return -1;
+            else return 0;
+        }
+    }
+    
+    private class SortByOriginalRank implements Comparator<KeyPartition>{            
+        @Override
+        public int compare(KeyPartition a, KeyPartition b) {
+            return a.firstRecSlotNum - b.firstRecSlotNum;
+        }
+    }
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/FieldPrefixNSMLeaf.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/FieldPrefixNSMLeaf.java
new file mode 100644
index 0000000..02a9c9e
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/FieldPrefixNSMLeaf.java
@@ -0,0 +1,646 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.frames;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import edu.uci.ics.hyracks.storage.am.btree.compressors.FieldPrefixCompressor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.FieldIterator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.FieldPrefixSlotManager;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.SlotOffRecOff;
+import edu.uci.ics.hyracks.storage.am.btree.impls.SplitKey;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrame;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IComparator;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IFieldAccessor;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IFrameCompressor;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IPrefixSlotManager;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.ISlotManager;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.SpaceStatus;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public class FieldPrefixNSMLeaf implements IBTreeFrameLeaf {
+	
+    protected static final int pageLsnOff = 0;                              // 0
+    protected static final int numRecordsOff = pageLsnOff + 4;              // 4    
+    protected static final int freeSpaceOff = numRecordsOff + 4;      		// 8
+    protected static final int totalFreeSpaceOff = freeSpaceOff + 4;        // 12	
+	protected static final int levelOff = totalFreeSpaceOff + 4;         	// 16
+	protected static final int smFlagOff = levelOff + 1;                   	// 20
+	protected static final int numPrefixRecordsOff = smFlagOff + 1; 		// 21
+	
+	protected static final int prevLeafOff = numPrefixRecordsOff + 4;		// 22
+	protected static final int nextLeafOff = prevLeafOff + 4;				// 26
+	
+	protected ICachedPage page = null;
+    protected ByteBuffer buf = null;
+    protected IFrameCompressor compressor;
+    public IPrefixSlotManager slotManager; // TODO: should be protected, but will trigger some refactoring
+    
+    public FieldPrefixNSMLeaf() {
+        this.slotManager = new FieldPrefixSlotManager();
+        this.compressor = new FieldPrefixCompressor();        
+    }
+    
+    @Override
+    public void setPage(ICachedPage page) {
+        this.page = page;
+        this.buf = page.getBuffer();
+        slotManager.setFrame(this);
+    }
+    
+    @Override
+    public ByteBuffer getBuffer() {
+        return page.getBuffer();
+    }
+    
+    @Override
+    public ICachedPage getPage() {
+        return page;
+    }
+        
+    @Override
+    public void compress(MultiComparator cmp) throws Exception {
+        compressor.compress(this, cmp);
+    }
+    
+    // assumptions: 
+    // 1. prefix records are stored contiguously
+    // 2. prefix records are located before records (physically on the page)
+    // this procedure will not move prefix records
+    @Override
+    public void compact(MultiComparator cmp) {
+        resetSpaceParams();
+                        
+        int numRecords = buf.getInt(numRecordsOff);
+        byte[] data = buf.array();
+        
+        // determine start of target free space (depends on assumptions stated above)
+        int freeSpace = buf.getInt(freeSpaceOff);        
+        int numPrefixRecords = buf.getInt(numPrefixRecordsOff);
+        if(numPrefixRecords > 0) {
+        	int prefixFields = 0;
+        	for(int i = 0; i < numPrefixRecords; i++) {
+        		int prefixSlotOff = slotManager.getPrefixSlotOff(i);
+        		int prefixSlot = buf.getInt(prefixSlotOff);
+        		int prefixRecOff = slotManager.decodeSecondSlotField(prefixSlot);
+        		if(prefixRecOff >= freeSpace) {
+        			freeSpace = prefixRecOff;
+        			prefixFields = slotManager.decodeFirstSlotField(prefixSlot);
+        		}
+        	}
+        	for(int i = 0; i < prefixFields; i++) {
+        		freeSpace += cmp.getFields()[i].getLength(data, freeSpace);
+        	}
+        }
+        
+        ArrayList<SlotOffRecOff> sortedRecOffs = new ArrayList<SlotOffRecOff>();
+        sortedRecOffs.ensureCapacity(numRecords);
+        for(int i = 0; i < numRecords; i++) {           
+            int recSlotOff = slotManager.getRecSlotOff(i);
+            int recSlot = buf.getInt(recSlotOff);
+            int recOff = slotManager.decodeSecondSlotField(recSlot);
+            sortedRecOffs.add(new SlotOffRecOff(recSlotOff, recOff));
+        }
+        Collections.sort(sortedRecOffs);
+        
+        for(int i = 0; i < sortedRecOffs.size(); i++) {                    	
+        	int recOff = sortedRecOffs.get(i).recOff;
+            int recSlot = buf.getInt(sortedRecOffs.get(i).slotOff);
+            int prefixSlotNum = slotManager.decodeFirstSlotField(recSlot);            
+            
+            int fieldStart = 0;
+            if(prefixSlotNum != FieldPrefixSlotManager.RECORD_UNCOMPRESSED) {
+            	int prefixSlotOff = slotManager.getPrefixSlotOff(prefixSlotNum);
+                int prefixSlot = buf.getInt(prefixSlotOff);
+                fieldStart = slotManager.decodeFirstSlotField(prefixSlot);                                
+            }
+            
+            int recRunner = recOff;
+            for(int j = fieldStart; j < cmp.getFields().length; j++) {
+            	recRunner += cmp.getFields()[j].getLength(data, recRunner);
+            }
+            int recSize = recRunner - recOff;
+                        
+            System.arraycopy(data, recOff, data, freeSpace, recSize);
+            slotManager.setSlot(sortedRecOffs.get(i).slotOff, slotManager.encodeSlotFields(prefixSlotNum, freeSpace));
+            freeSpace += recSize;
+        }
+        
+        buf.putInt(freeSpaceOff, freeSpace);
+        int totalFreeSpace = buf.capacity() - buf.getInt(freeSpaceOff) - ((buf.getInt(numRecordsOff) + buf.getInt(numPrefixRecordsOff)) * slotManager.getSlotSize());        
+        buf.putInt(totalFreeSpaceOff, totalFreeSpace);
+    }
+    
+    @Override
+    public void delete(byte[] data, MultiComparator cmp, boolean exactDelete) throws Exception {        
+        int slot = slotManager.findSlot(buf, data, cmp, true);
+        int recSlotNum = slotManager.decodeSecondSlotField(slot);
+        if(recSlotNum == FieldPrefixSlotManager.GREATEST_SLOT) {
+            throw new BTreeException("Key to be deleted does not exist.");   
+        }
+        else {
+            int prefixSlotNum = slotManager.decodeFirstSlotField(slot);            
+            int recSlotOff = slotManager.getRecSlotOff(recSlotNum);
+            
+            if(exactDelete) {                    
+                IComparator[] cmps = cmp.getComparators();
+                IFieldAccessor[] fields = cmp.getFields();
+                FieldIterator fieldIter = new FieldIterator(fields, this);                    
+                fieldIter.openRecSlotOff(recSlotOff);
+                
+                int dataRunner = 0;
+                for(int i = 0; i < cmp.getKeyLength(); i++) {
+                    dataRunner += fields[i].getLength(data, dataRunner);
+                    fieldIter.nextField();                   
+                }                
+                for(int i = cmp.getKeyLength(); i < fields.length; i++) {
+                    if(cmps[i].compare(data, dataRunner, buf.array(), fieldIter.getFieldOff()) != 0) {
+                        throw new BTreeException("Cannot delete record. Byte-by-byte comparison failed to prove equality.");
+                    }
+                    dataRunner += fields[i].getLength(data, dataRunner);
+                    fieldIter.nextField();    
+                }                                
+            }
+            
+            // perform deletion (we just do a memcpy to overwrite the slot)
+            int slotEndOff = slotManager.getRecSlotEndOff();
+            int length = recSlotOff - slotEndOff;
+            System.arraycopy(buf.array(), slotEndOff, buf.array(), slotEndOff + slotManager.getSlotSize(), length);
+            
+            // maintain space information, get size of record suffix (suffix could be entire record)
+            int recSize = 0;                        
+            int suffixFieldStart = 0;
+            FieldIterator fieldIter = new FieldIterator(cmp.getFields(), this);                 
+            fieldIter.openRecSlotOff(recSlotOff);
+            if(prefixSlotNum == FieldPrefixSlotManager.RECORD_UNCOMPRESSED) {
+                suffixFieldStart = 0;
+            }
+            else {                
+                int prefixSlot = buf.getInt(slotManager.getPrefixSlotOff(prefixSlotNum));
+                suffixFieldStart = slotManager.decodeFirstSlotField(prefixSlot); 
+            }
+            
+            for(int i = 0; i < suffixFieldStart; i++) {
+                fieldIter.nextField();
+            }
+            for(int i = suffixFieldStart; i < cmp.getFields().length; i++) {
+                recSize += cmp.getFields()[i].getLength(buf.array(), fieldIter.getFieldOff());
+                fieldIter.nextField();
+            }
+            
+            buf.putInt(numRecordsOff, buf.getInt(numRecordsOff) - 1);
+            buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + recSize + slotManager.getSlotSize());            
+        }    
+    }
+    
+    @Override
+    public SpaceStatus hasSpaceInsert(byte[] data, MultiComparator cmp) {                    	
+    	int freeContiguous = buf.capacity() - buf.getInt(freeSpaceOff) - ((buf.getInt(numRecordsOff) + buf.getInt(numPrefixRecordsOff)) * slotManager.getSlotSize());     	    	                        
+        
+        // see if the record would fit uncompressed
+        if(data.length + slotManager.getSlotSize() <= freeContiguous) return SpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE;
+        
+        // see if record would fit into remaining space after compaction
+        if(data.length + slotManager.getSlotSize() <= buf.getInt(totalFreeSpaceOff)) return SpaceStatus.SUFFICIENT_SPACE;
+        
+        // see if the record matches a prefix and will fit after truncating the prefix
+        int prefixSlotNum = slotManager.findPrefix(data, cmp);
+        if(prefixSlotNum != FieldPrefixSlotManager.RECORD_UNCOMPRESSED) {
+        	int prefixSlotOff = slotManager.getPrefixSlotOff(prefixSlotNum);
+        	int prefixSlot = buf.getInt(prefixSlotOff);
+        	int numPrefixFields = slotManager.decodeFirstSlotField(prefixSlot);
+        	
+        	int recRunner = 0;
+        	for(int i = 0; i < numPrefixFields; i++) {
+        		recRunner += cmp.getFields()[i].getLength(data, recRunner);
+        	}
+        	int compressedSize = data.length - recRunner;
+        	if(compressedSize + slotManager.getSlotSize() <= freeContiguous) return SpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE;        	
+        }
+        
+        return SpaceStatus.INSUFFICIENT_SPACE;
+    }
+    
+    @Override
+    public SpaceStatus hasSpaceUpdate(int rid, byte[] data, MultiComparator cmp) {
+        // TODO Auto-generated method stub
+        return SpaceStatus.INSUFFICIENT_SPACE;
+    }
+
+    protected void resetSpaceParams() {    	
+    	buf.putInt(freeSpaceOff, getOrigFreeSpaceOff());
+        buf.putInt(totalFreeSpaceOff, getOrigTotalFreeSpace());
+    }
+    
+    @Override
+    public void initBuffer(byte level) {        
+        buf.putInt(pageLsnOff, 0); // TODO: might to set to a different lsn during creation
+        buf.putInt(numRecordsOff, 0);   
+        resetSpaceParams();
+        buf.putInt(numPrefixRecordsOff, 0);
+        buf.put(levelOff, level);
+        buf.put(smFlagOff, (byte)0);
+        buf.putInt(prevLeafOff, -1);
+		buf.putInt(nextLeafOff, -1);
+    }
+    
+    public void setTotalFreeSpace(int totalFreeSpace) {
+        buf.putInt(totalFreeSpaceOff, totalFreeSpace);
+    }
+    
+    public int getOrigTotalFreeSpace() {
+        return buf.capacity() - (nextLeafOff + 4);
+    }
+    
+    @Override
+    public void insert(byte[] data, MultiComparator cmp) throws Exception {        
+    	int slot = slotManager.findSlot(buf, data, cmp, false);        
+        slot = slotManager.insertSlot(slot, buf.getInt(freeSpaceOff));
+        
+        int suffixSize = data.length;
+        int suffixStartOff = 0;
+        int prefixSlotNum = slotManager.decodeFirstSlotField(slot);
+        
+        if(prefixSlotNum != FieldPrefixSlotManager.RECORD_UNCOMPRESSED) {
+        	
+            int prefixSlotOff = slotManager.getPrefixSlotOff(prefixSlotNum);            
+            int prefixSlot = buf.getInt(prefixSlotOff);
+            int numPrefixFields = slotManager.decodeFirstSlotField(prefixSlot);
+                                    
+            // skip prefix fields
+            for(int i = 0; i < numPrefixFields; i++) {
+                suffixStartOff += cmp.getFields()[i].getLength(data, suffixStartOff);
+            }
+            
+            // compute suffix size
+            suffixSize = suffixStartOff; 
+            for(int i = numPrefixFields; i < cmp.getFields().length; i++) {
+                suffixSize += cmp.getFields()[i].getLength(data, suffixSize);
+            }
+            suffixSize -= suffixStartOff;                        
+        }
+        
+        int freeSpace = buf.getInt(freeSpaceOff);
+        System.arraycopy(data, suffixStartOff, buf.array(), freeSpace, suffixSize);            
+        buf.putInt(numRecordsOff, buf.getInt(numRecordsOff) + 1);
+        buf.putInt(freeSpaceOff, buf.getInt(freeSpaceOff) + suffixSize);
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) - suffixSize - slotManager.getSlotSize());
+               
+        //System.out.println(buf.getInt(totalFreeSpaceOff) + " " + buf.getInt(freeSpaceOff) + " " + buf.getInt(numRecordsOff));        
+        //System.out.println("COPIED: " + suffixSize + " / " + data.length);
+    }
+    
+    public void verifyPrefixes(MultiComparator cmp) {
+    	int numPrefixes = buf.getInt(numPrefixRecordsOff);
+    	int totalPrefixBytes = 0;
+    	for(int i = 0; i < numPrefixes; i++) {
+    		int prefixSlotOff = slotManager.getPrefixSlotOff(i);
+    		int prefixSlot = buf.getInt(prefixSlotOff);
+    		
+    		int numPrefixFields = slotManager.decodeFirstSlotField(prefixSlot);
+    		int prefixRecOff = slotManager.decodeSecondSlotField(prefixSlot);
+    		
+    		System.out.print("VERIFYING " + i + " : " + numPrefixFields + " " + prefixRecOff + " ");
+    		int recOffRunner = prefixRecOff;
+    		for(int j = 0; j < numPrefixFields; j++) {
+    			System.out.print(buf.getInt(prefixRecOff+j*4) + " ");
+    			int length = cmp.getFields()[j].getLength(buf.array(), recOffRunner);
+    			recOffRunner += length;
+    			totalPrefixBytes += length;    			
+    		}
+    		System.out.println();
+    	}    	        
+    	System.out.println("VER TOTALPREFIXBYTES: " + totalPrefixBytes + " " + numPrefixes);
+    }
+    
+    @Override
+    public void update(int rid, byte[] data) throws Exception {
+        // TODO Auto-generated method stub
+        
+    }   
+    
+    @Override
+    public void printHeader() {
+        // TODO Auto-generated method stub
+        
+    }       
+        
+    @Override
+    public int getNumRecords() {
+        return buf.getInt(numRecordsOff);
+    }
+    
+    public ISlotManager getSlotManager() {
+        return null; // TODO: dummy
+    }
+    
+    @Override
+    public String printKeys(MultiComparator cmp) {      
+        StringBuilder strBuilder = new StringBuilder();
+        FieldIterator rec = new FieldIterator(cmp.getFields(), this);      
+        int numRecords = buf.getInt(numRecordsOff);
+        for(int i = 0; i < numRecords; i++) {                               	
+        	rec.openRecSlotNum(i);        	
+        	//strBuilder.append(String.format("RECORD %5d: ", i));
+        	for(int j = 0; j < cmp.size(); j++) {               
+                strBuilder.append(cmp.getFields()[j].print(buf.array(), rec.getFieldOff()) + " ");
+                rec.nextField();
+            }
+            strBuilder.append(" | ");        	        	                           
+        }
+        strBuilder.append("\n");
+        return strBuilder.toString();
+    }
+    
+    @Override
+    public int getRecordOffset(int slotNum) {        
+    	return slotManager.getRecSlotOff(slotNum);
+    }
+    
+    @Override
+    public int getPageLsn() {
+        return buf.getInt(pageLsnOff);      
+    }
+
+    @Override
+    public void setPageLsn(int pageLsn) {
+        buf.putInt(pageLsnOff, pageLsn);        
+    }
+
+    @Override
+    public int getTotalFreeSpace() {
+        return buf.getInt(totalFreeSpaceOff);
+    }
+    
+	@Override
+	public boolean isLeaf() {
+		return buf.get(levelOff) == 0;
+	}
+			
+	@Override
+	public byte getLevel() {		
+		return buf.get(levelOff);
+	}
+	
+	@Override
+	public void setLevel(byte level) {
+		buf.put(levelOff, level);				
+	}
+	
+	@Override
+	public boolean getSmFlag() {
+		return buf.get(smFlagOff) != 0;
+	}
+
+	@Override
+	public void setSmFlag(boolean smFlag) {
+		if(smFlag)			
+			buf.put(smFlagOff, (byte)1);		
+		else
+			buf.put(smFlagOff, (byte)0);			
+	}		
+	
+	public int getNumPrefixRecords() {
+		return buf.getInt(numPrefixRecordsOff);
+	}
+	
+	public void setNumPrefixRecords(int numPrefixRecords) {
+		buf.putInt(numPrefixRecordsOff, numPrefixRecords);
+	}
+
+    @Override
+    public void insertSorted(byte[] data, MultiComparator cmp) throws Exception {
+        // TODO Auto-generated method stub
+        
+    }
+    
+    @Override
+    public int split(IBTreeFrame rightFrame, byte[] data, MultiComparator cmp, SplitKey splitKey) throws Exception {
+    	    	
+    	//System.out.println("ORIG");
+    	//String orig = printKeys(cmp);
+    	//System.out.println(orig);
+    	
+    	FieldPrefixNSMLeaf rf = (FieldPrefixNSMLeaf)rightFrame;
+    	
+    	// before doing anything check if key already exists
+		int slot = slotManager.findSlot(buf, data, cmp, true);
+		int recSlotNum = slotManager.decodeSecondSlotField(slot);	
+		if(recSlotNum != FieldPrefixSlotManager.GREATEST_SLOT) {
+			int prefixSlotNum = slotManager.decodeFirstSlotField(slot);				
+			if(slotManager.compareCompressed(data, buf.array(), prefixSlotNum, recSlotNum, cmp) == 0) {			
+				throw new BTreeException("Inserting duplicate key into unique index");
+			}
+		}
+		
+		ByteBuffer right = rf.getBuffer();
+		int numRecords = getNumRecords();
+		int numPrefixRecords = getNumPrefixRecords();
+		
+		int recordsToLeft;
+		int midSlotNum = numRecords / 2;
+		IBTreeFrame targetFrame = null;
+		int midSlotOff = slotManager.getRecSlotOff(midSlotNum);
+		int midSlot = buf.getInt(midSlotOff);
+		int midPrefixSlotNum = slotManager.decodeFirstSlotField(midSlot);
+		int midRecOff = slotManager.decodeSecondSlotField(midSlot);
+		int comparison = 0;
+		if(midPrefixSlotNum == FieldPrefixSlotManager.RECORD_UNCOMPRESSED) {
+			comparison = cmp.compare(data, 0, buf.array(), midRecOff);
+		}		
+		else {			
+			comparison = slotManager.compareCompressed(data, buf.array(), midPrefixSlotNum, midSlotNum, cmp);
+		}
+				
+		if (comparison >= 0) {
+			recordsToLeft = midSlotNum + (numRecords % 2);
+			targetFrame = rf;
+		} else {
+			recordsToLeft = midSlotNum;
+			targetFrame = this;
+		}
+		int recordsToRight = numRecords - recordsToLeft;
+				
+		// copy entire page
+		System.arraycopy(buf.array(), 0, right.array(), 0, buf.capacity());
+				
+		// determine how many slots go on left and right page
+		int prefixesToLeft = numPrefixRecords;		
+		for(int i = recordsToLeft; i < numRecords; i++) {			
+			int recSlotOff = rf.slotManager.getRecSlotOff(i);						
+			int recSlot = right.getInt(recSlotOff);
+			int prefixSlotNum = rf.slotManager.decodeFirstSlotField(recSlot);			
+			if(prefixSlotNum != FieldPrefixSlotManager.RECORD_UNCOMPRESSED) {
+				prefixesToLeft = prefixSlotNum;			
+				break;
+			}
+		}
+		
+		// if we are splitting in the middle of a prefix both pages need to have the prefix slot and record
+		int bounradyRecSlotOff = rf.slotManager.getRecSlotOff(recordsToLeft-1);
+		int boundaryRecSlot = buf.getInt(bounradyRecSlotOff);
+		int boundaryPrefixSlotNum = rf.slotManager.decodeFirstSlotField(boundaryRecSlot);
+		int prefixesToRight = numPrefixRecords - prefixesToLeft;
+		if(boundaryPrefixSlotNum == prefixesToLeft && boundaryPrefixSlotNum != FieldPrefixSlotManager.RECORD_UNCOMPRESSED) {
+			prefixesToLeft++; // records on both pages share one prefix 
+		}			
+								
+		// move prefix records on right page to beginning of page and adjust prefix slots
+		if(prefixesToRight > 0 && prefixesToLeft > 0 && numPrefixRecords > 1) {			
+			
+			int freeSpace = rf.getOrigFreeSpaceOff();
+			int lastPrefixSlotNum = -1;
+			
+			for(int i = recordsToLeft; i < numRecords; i++) {			
+				int recSlotOff = rf.slotManager.getRecSlotOff(i);						
+				int recSlot = right.getInt(recSlotOff);
+				int prefixSlotNum = rf.slotManager.decodeFirstSlotField(recSlot);			
+				if(prefixSlotNum != FieldPrefixSlotManager.RECORD_UNCOMPRESSED) {								
+					int prefixSlotOff = rf.slotManager.getPrefixSlotOff(prefixSlotNum);
+					int prefixSlot = right.getInt(prefixSlotOff);
+					int numPrefixFields = rf.slotManager.decodeFirstSlotField(prefixSlot);
+					
+					int prefixRecSize = 0;
+					if(lastPrefixSlotNum != prefixSlotNum) {
+						int prefixRecOff = rf.slotManager.decodeSecondSlotField(prefixSlot);
+						for(int j = 0; j < numPrefixFields; j++) {
+							prefixRecSize += cmp.getFields()[j].getLength(buf.array(), prefixRecOff + prefixRecSize);						
+						}
+						// copy from left page to make sure not to overwrite anything in right page that we may need later
+						System.arraycopy(buf.array(), prefixRecOff, right.array(), freeSpace, prefixRecSize);
+						
+						int newPrefixSlot = rf.slotManager.encodeSlotFields(numPrefixFields, freeSpace);
+						right.putInt(prefixSlotOff, newPrefixSlot);
+						
+						lastPrefixSlotNum = prefixSlotNum;
+					}
+					
+					int recOff = rf.slotManager.decodeSecondSlotField(recSlot);
+					int newRecSlot = rf.slotManager.encodeSlotFields(prefixSlotNum - (numPrefixRecords - prefixesToRight), recOff);
+					right.putInt(recSlotOff, newRecSlot);
+															
+					freeSpace += prefixRecSize;
+				}
+			}
+		}
+				
+		// move the modified prefix slots on the right page
+		int prefixSrc = rf.slotManager.getPrefixSlotEndOff();
+		int prefixDest = rf.slotManager.getPrefixSlotEndOff() + (numPrefixRecords - prefixesToRight) * rf.slotManager.getSlotSize();
+		int prefixLength = rf.slotManager.getSlotSize() * prefixesToRight;
+		System.arraycopy(right.array(), prefixSrc, right.array(), prefixDest, prefixLength);
+		
+		// on right page we need to copy rightmost record slots to left
+		int src = rf.slotManager.getRecSlotEndOff();
+		int dest = rf.slotManager.getRecSlotEndOff() + recordsToLeft * rf.slotManager.getSlotSize() + (numPrefixRecords - prefixesToRight) * rf.slotManager.getSlotSize();
+		int length = rf.slotManager.getSlotSize() * recordsToRight;				
+		System.arraycopy(right.array(), src, right.array(), dest, length);
+		
+		right.putInt(numRecordsOff, recordsToRight);
+		right.putInt(numPrefixRecordsOff, prefixesToRight);
+				
+		// on left page move slots to reflect possibly removed prefixes
+		src = slotManager.getRecSlotEndOff() + recordsToRight * slotManager.getSlotSize();
+		dest = slotManager.getRecSlotEndOff() + recordsToRight * slotManager.getSlotSize() + (numPrefixRecords - prefixesToLeft) * slotManager.getSlotSize();
+		length = slotManager.getSlotSize() * recordsToLeft;				
+		System.arraycopy(buf.array(), src, buf.array(), dest, length);
+		
+		buf.putInt(numRecordsOff, recordsToLeft);
+		buf.putInt(numPrefixRecordsOff, prefixesToLeft);	
+					
+		
+		//System.out.println("VERIFY LEFT");
+		//verifyPrefixes(cmp);
+		
+		//System.out.println("VERIFY RIGHT");
+		//rf.verifyPrefixes(cmp);
+		
+		String a = printKeys(cmp);
+		String b = rightFrame.printKeys(cmp);		 
+		//System.out.println("BEFORE COMPACT");
+		//System.out.println(a);		
+		//System.out.println(b);
+		
+		// compact both pages		
+		compact(cmp);
+		rightFrame.compact(cmp);
+						
+		a = printKeys(cmp);
+		b = rightFrame.printKeys(cmp);		 
+		//System.out.println("AFTER COMPACT");
+		//System.out.println(a);		
+		//System.out.println(b);
+		
+		//System.out.println("AFTER SPLIT REC: " + getNumRecords() + " " + rightFrame.getNumRecords());
+		//System.out.println("AFTER SPLIT TOT: " + getTotalFreeSpace() + " " + rightFrame.getTotalFreeSpace());
+		
+		// insert last key
+		targetFrame.insert(data, cmp);			
+		
+		// set split key to be highest value in left page		
+		int splitKeyRecSlotOff = slotManager.getRecSlotEndOff();		
+		
+		int keySize = 0;
+		FieldIterator fieldIter = new FieldIterator(cmp.getFields(), this);
+		fieldIter.openRecSlotOff(splitKeyRecSlotOff);		
+		for(int i = 0; i < cmp.getKeyLength(); i++) {
+			keySize += fieldIter.getFieldSize();
+			fieldIter.nextField();
+		}									
+		splitKey.initData(keySize);
+		fieldIter.copyFields(0, cmp.getKeyLength()-1, splitKey.getData(), 0);
+		
+		return 0;
+    }
+    
+	@Override
+	public int getFreeSpaceOff() {
+		return buf.getInt(freeSpaceOff);
+	}
+	
+	public int getOrigFreeSpaceOff() {
+		return nextLeafOff + 4;
+	}
+	
+	@Override
+	public void setFreeSpaceOff(int freeSpace) {
+		buf.putInt(freeSpaceOff, freeSpace);		
+	}
+	
+	@Override
+	public void setNextLeaf(int page) {
+		buf.putInt(nextLeafOff, page);
+	}
+
+	@Override
+	public void setPrevLeaf(int page) {
+		buf.putInt(prevLeafOff, page);
+	}
+
+	@Override
+	public int getNextLeaf() {
+		return buf.getInt(nextLeafOff);
+	}
+
+	@Override
+	public int getPrevLeaf() {
+		return buf.getInt(prevLeafOff);
+	}
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/FieldPrefixNSMLeafFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/FieldPrefixNSMLeafFactory.java
new file mode 100644
index 0000000..b01dc4b
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/FieldPrefixNSMLeafFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.frames;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameLeafFactory;
+
+public class FieldPrefixNSMLeafFactory implements IBTreeFrameLeafFactory {
+	@Override
+	public IBTreeFrameLeaf getFrame() {		
+		return new FieldPrefixNSMLeaf();
+	}
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
new file mode 100644
index 0000000..50137cf
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -0,0 +1,1299 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import java.util.ArrayList;
+import java.util.Stack;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeCursor;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrame;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameInterior;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameInteriorFactory;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameLeafFactory;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameMeta;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.ISlotManager;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.ISlotManagerFactory;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.SpaceStatus;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.FileInfo;
+
+public class BTree {
+    private static final Logger LOGGER = Logger.getLogger(BTree.class.getName());
+
+    private final static int RESTART_OP = Integer.MIN_VALUE;
+    private final static int MAX_RESTARTS = 10;
+
+    private final int metaDataPage = 0; // page containing meta data, e.g.,
+                                        // maxPage
+    private final int rootPage = 1; // the root page never changes
+
+    private final IBufferCache bufferCache;
+    private int fileId;
+    private final IBTreeFrameInteriorFactory interiorFrameFactory;
+    private final IBTreeFrameLeafFactory leafFrameFactory;
+    private final ISlotManagerFactory interiorSlotManagerFactory;
+    private final ISlotManagerFactory leafSlotManagerFactory;
+    private final MultiComparator cmp;
+    private final ReadWriteLock treeLatch;
+
+    public int rootSplits = 0;
+    public int[] splitsByLevel = new int[500];
+    public long readLatchesAcquired = 0;
+    public long readLatchesReleased = 0;
+    public long writeLatchesAcquired = 0;
+    public long writeLatchesReleased = 0;
+    public long pins = 0;
+    public long unpins = 0;
+
+    public long treeLatchesAcquired = 0;
+    public long treeLatchesReleased = 0;
+
+    public byte currentLevel = 0;
+
+    public int usefulCompression = 0;
+    public int uselessCompression = 0;
+
+    public String printStats() {
+        StringBuilder strBuilder = new StringBuilder();
+        strBuilder.append("\n");
+        strBuilder.append("ROOTSPLITS: " + rootSplits + "\n");
+        strBuilder.append("SPLITS BY LEVEL\n");
+        for (int i = 0; i < currentLevel; i++) {
+            strBuilder.append(String.format("%3d ", i) + String.format("%8d ", splitsByLevel[i]) + "\n");
+        }
+        strBuilder.append(String.format("READ LATCHES:  %10d %10d\n", readLatchesAcquired, readLatchesReleased));
+        strBuilder.append(String.format("WRITE LATCHES: %10d %10d\n", writeLatchesAcquired, writeLatchesReleased));
+        strBuilder.append(String.format("PINS:          %10d %10d\n", pins, unpins));
+        return strBuilder.toString();
+    }
+
+    public BTree(IBufferCache bufferCache, IBTreeFrameInteriorFactory interiorFrameFactory,
+            IBTreeFrameLeafFactory leafFrameFactory, ISlotManagerFactory interiorSlotManagerFactory,
+            ISlotManagerFactory leafSlotManagerFactory, MultiComparator cmp) {
+        this.bufferCache = bufferCache;
+        this.interiorFrameFactory = interiorFrameFactory;
+        this.leafFrameFactory = leafFrameFactory;
+        this.interiorSlotManagerFactory = interiorSlotManagerFactory;
+        this.leafSlotManagerFactory = leafSlotManagerFactory;
+        this.cmp = cmp;
+        this.treeLatch = new ReentrantReadWriteLock(true);
+    }
+
+    public void create(int fileId, IBTreeFrameLeaf leafFrame, IBTreeFrameMeta metaFrame) throws Exception {
+        // initialize meta data page
+        ICachedPage metaNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, metaDataPage), false);
+        pins++;
+
+        metaNode.acquireWriteLatch();
+        writeLatchesAcquired++;
+        try {
+            metaFrame.setPage(metaNode);
+            metaFrame.initBuffer((byte) -1);
+            metaFrame.setMaxPage(rootPage);
+        } finally {
+            metaNode.releaseWriteLatch();
+            writeLatchesReleased++;
+            bufferCache.unpin(metaNode);
+            unpins++;
+        }
+
+        // initialize root page
+        ICachedPage rootNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, rootPage), true);
+        pins++;
+
+        rootNode.acquireWriteLatch();
+        writeLatchesAcquired++;
+        try {
+            leafFrame.setPage(rootNode);
+            leafFrame.initBuffer((byte) 0);
+        } finally {
+            rootNode.releaseWriteLatch();
+            writeLatchesReleased++;
+            bufferCache.unpin(rootNode);
+            unpins++;
+        }
+        currentLevel = 0;
+    }
+
+    public void open(int fileId) {
+        this.fileId = fileId;
+    }
+
+    public void close() {
+        fileId = -1;
+    }
+
+    private int getFreePage(IBTreeFrameMeta metaFrame) throws Exception {
+        ICachedPage metaNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, metaDataPage), false);
+        pins++;
+
+        metaNode.acquireWriteLatch();
+        writeLatchesAcquired++;
+
+        int freePage = -1;
+        try {
+            metaFrame.setPage(metaNode);
+            freePage = metaFrame.getFreePage();
+            if (freePage < 0) { // no free page entry on this page
+                int nextPage = metaFrame.getNextPage();
+                if (nextPage > 0) { // sibling may have free pages
+                    ICachedPage nextNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, nextPage), false);
+                    pins++;
+
+                    nextNode.acquireWriteLatch();
+                    writeLatchesAcquired++;
+                    // we copy over the free space entries of nextpage into the
+                    // first meta page (metaDataPage)
+                    // we need to link the first page properly to the next page
+                    // of nextpage
+                    try {
+                        // remember entries that remain unchanged
+                        int maxPage = metaFrame.getMaxPage();
+
+                        // copy entire page (including sibling pointer, free
+                        // page entries, and all other info)
+                        // after this copy nextPage is considered a free page
+                        System.arraycopy(nextNode.getBuffer().array(), 0, metaNode.getBuffer().array(), 0, nextNode
+                                .getBuffer().capacity());
+
+                        // reset unchanged entry
+                        metaFrame.setMaxPage(maxPage);
+
+                        freePage = metaFrame.getFreePage();
+                        // sibling also has no free pages, this "should" not
+                        // happen, but we deal with it anyway just to be safe
+                        if (freePage < 0) {
+                            freePage = nextPage;
+                        } else {
+                            metaFrame.addFreePage(nextPage);
+                        }
+                    } finally {
+                        nextNode.releaseWriteLatch();
+                        writeLatchesReleased++;
+                        bufferCache.unpin(nextNode);
+                        unpins++;
+                    }
+                } else {
+                    freePage = metaFrame.getMaxPage();
+                    freePage++;
+                    metaFrame.setMaxPage(freePage);
+                }
+            }
+        } finally {
+            metaNode.releaseWriteLatch();
+            writeLatchesReleased++;
+            bufferCache.unpin(metaNode);
+            unpins++;
+        }
+
+        return freePage;
+    }
+
+    private void addFreePages(BTreeOpContext ctx) throws Exception {
+        for (Integer i : ctx.freePages) {
+            addFreePage(ctx.metaFrame, i);
+        }
+        ctx.freePages.clear();
+    }
+
+    private void addFreePage(IBTreeFrameMeta metaFrame, int freePage) throws Exception {
+        // root page is special, don't add it to free pages
+        if (freePage == rootPage)
+            return;
+
+        ICachedPage metaNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, metaDataPage), false);
+        pins++;
+
+        metaNode.acquireWriteLatch();
+        writeLatchesAcquired++;
+
+        metaFrame.setPage(metaNode);
+
+        try {
+            if (metaFrame.hasSpace()) {
+                metaFrame.addFreePage(freePage);
+            } else {
+                // allocate a new page in the chain of meta pages
+                int newPage = metaFrame.getFreePage();
+                if (newPage < 0) {
+                    throw new Exception("Inconsistent Meta Page State. It has no space, but it also has no entries.");
+                }
+
+                ICachedPage newNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, newPage), false);
+                pins++;
+
+                newNode.acquireWriteLatch();
+                writeLatchesAcquired++;
+
+                try {
+                    int metaMaxPage = metaFrame.getMaxPage();
+
+                    // copy metaDataPage to newNode
+                    System.arraycopy(metaNode.getBuffer().array(), 0, newNode.getBuffer().array(), 0, metaNode
+                            .getBuffer().capacity());
+
+                    metaFrame.initBuffer(-1);
+                    metaFrame.setNextPage(newPage);
+                    metaFrame.setMaxPage(metaMaxPage);
+                    metaFrame.addFreePage(freePage);
+                } finally {
+                    newNode.releaseWriteLatch();
+                    writeLatchesReleased++;
+
+                    bufferCache.unpin(newNode);
+                    unpins++;
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            metaNode.releaseWriteLatch();
+            writeLatchesReleased++;
+
+            bufferCache.unpin(metaNode);
+            unpins++;
+        }
+    }
+
+    public void printTree(IBTreeFrameLeaf leafFrame, IBTreeFrameInterior interiorFrame) throws Exception {
+        printTree(rootPage, null, false, leafFrame, interiorFrame);
+    }
+
+    public void printTree(int pageId, ICachedPage parent, boolean unpin, IBTreeFrameLeaf leafFrame,
+            IBTreeFrameInterior interiorFrame) throws Exception {
+
+        ICachedPage node = bufferCache.pin(FileInfo.getDiskPageId(fileId, pageId), false);
+        pins++;
+        node.acquireReadLatch();
+        readLatchesAcquired++;
+
+        try {
+            if (parent != null && unpin == true) {
+                parent.releaseReadLatch();
+                readLatchesReleased++;
+
+                bufferCache.unpin(parent);
+                unpins++;
+            }
+
+            interiorFrame.setPage(node);
+            int level = interiorFrame.getLevel();
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(String.format("%1d ", level));
+                LOGGER.finest(String.format("%3d ", pageId));
+                for (int i = 0; i < currentLevel - level; i++)
+                    LOGGER.finest("    ");
+
+                String keyString;
+                if (interiorFrame.isLeaf()) {
+                    leafFrame.setPage(node);
+                    keyString = leafFrame.printKeys(cmp);
+                } else {
+                    keyString = interiorFrame.printKeys(cmp);
+                }
+
+                LOGGER.finest(keyString);
+            }
+
+            if (!interiorFrame.isLeaf()) {
+                ArrayList<Integer> children = ((BTreeNSMInterior) (interiorFrame)).getChildren(cmp);
+                for (int i = 0; i < children.size(); i++) {
+                    printTree(children.get(i), node, i == children.size() - 1, leafFrame, interiorFrame);
+                }
+            } else {
+                node.releaseReadLatch();
+                readLatchesReleased++;
+
+                bufferCache.unpin(node);
+                unpins++;
+            }
+        } catch (Exception e) {
+            node.releaseReadLatch();
+            readLatchesReleased++;
+
+            bufferCache.unpin(node);
+            unpins++;
+        }
+    }
+
+    public void diskOrderScan(BTreeDiskOrderScanCursor cursor, IBTreeFrameLeaf leafFrame, IBTreeFrameMeta metaFrame)
+            throws Exception {
+        int currentPageId = rootPage + 1;
+        int maxPageId = -1;
+
+        ICachedPage metaNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, metaDataPage), false);
+        pins++;
+
+        metaNode.acquireReadLatch();
+        readLatchesAcquired++;
+
+        try {
+            metaFrame.setPage(metaNode);
+            maxPageId = metaFrame.getMaxPage();
+        } finally {
+            metaNode.releaseReadLatch();
+            readLatchesAcquired++;
+
+            bufferCache.unpin(metaNode);
+            unpins++;
+        }
+
+        ICachedPage page = bufferCache.pin(FileInfo.getDiskPageId(fileId, currentPageId), false);
+        page.acquireReadLatch();
+        cursor.setBufferCache(bufferCache);
+        cursor.setFileId(fileId);
+        cursor.setCurrentPageId(currentPageId);
+        cursor.setMaxPageId(maxPageId);
+        cursor.open(page, null);
+    }
+
+    public void search(IBTreeCursor cursor, RangePredicate pred, IBTreeFrameLeaf leafFrame,
+            IBTreeFrameInterior interiorFrame) throws Exception {
+        BTreeOpContext ctx = new BTreeOpContext();
+        ctx.op = BTreeOp.BTO_SEARCH;
+        ctx.leafFrame = leafFrame;
+        ctx.interiorFrame = interiorFrame;
+        ctx.metaFrame = null;
+        ctx.pred = pred;
+        ctx.cursor = cursor;
+        ctx.pageLsns = new Stack<Integer>();
+        if (ctx.pred.getComparator() == null)
+            ctx.pred.setComparator(cmp); // simple index scan
+
+        boolean repeatOp = true;
+        // we use this loop to deal with possibly multiple operation restarts
+        // due to ongoing structure modifications during the descent
+        while (repeatOp && ctx.opRestarts < MAX_RESTARTS) {
+            performOp(rootPage, null, ctx);
+
+            // if we reach this stage then we need to restart from the (possibly
+            // new) root
+            if (!ctx.pageLsns.isEmpty() && ctx.pageLsns.peek().equals(RESTART_OP)) {
+                ctx.pageLsns.pop(); // pop the restart op indicator
+                continue;
+            }
+
+            repeatOp = false;
+        }
+
+        cursor.setBufferCache(bufferCache);
+        cursor.setFileId(fileId);
+    }
+
+    private void unsetSmPages(BTreeOpContext ctx) throws Exception {
+        ICachedPage originalPage = ctx.interiorFrame.getPage();
+        for (Integer i : ctx.smPages) {
+            ICachedPage smPage = bufferCache.pin(FileInfo.getDiskPageId(fileId, i), false);
+            pins++;
+            smPage.acquireWriteLatch(); // TODO: would like to set page dirty without latching
+            writeLatchesAcquired++;
+            try {
+                ctx.interiorFrame.setPage(smPage);
+                ctx.interiorFrame.setSmFlag(false);
+            } finally {
+                smPage.releaseWriteLatch();
+                writeLatchesReleased++;
+                bufferCache.unpin(smPage);
+                unpins++;
+            }
+        }
+        if (ctx.smPages.size() > 0) {
+            treeLatch.writeLock().unlock();
+            treeLatchesReleased++;
+            ctx.smPages.clear();
+        }
+        ctx.interiorFrame.setPage(originalPage);
+    }
+
+    private void createNewRoot(BTreeOpContext ctx) throws Exception {
+        rootSplits++; // debug
+        splitsByLevel[currentLevel]++;
+        currentLevel++;
+
+        // make sure the root is always at the same level
+        ICachedPage leftNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, ctx.splitKey.getLeftPage()), false);
+        pins++;
+        leftNode.acquireWriteLatch(); // TODO: think about whether latching is really required        
+        writeLatchesAcquired++;
+        try {
+            ICachedPage rightNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, ctx.splitKey.getRightPage()), false);
+            pins++;
+            rightNode.acquireWriteLatch(); // TODO: think about whether latching is really required
+            writeLatchesAcquired++;
+            try {
+                int newLeftId = getFreePage(ctx.metaFrame);
+                ICachedPage newLeftNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, newLeftId), true);
+                pins++;
+                newLeftNode.acquireWriteLatch(); // TODO: think about whether latching is really required
+                writeLatchesAcquired++;
+                try {
+                    // copy left child to new left child
+                    System.arraycopy(leftNode.getBuffer().array(), 0, newLeftNode.getBuffer().array(), 0, newLeftNode
+                            .getBuffer().capacity());
+                    ctx.interiorFrame.setPage(newLeftNode);
+                    ctx.interiorFrame.setSmFlag(false);
+
+                    // change sibling pointer if children are leaves
+                    ctx.leafFrame.setPage(rightNode);
+                    if (ctx.leafFrame.isLeaf()) {
+                        ctx.leafFrame.setPrevLeaf(newLeftId);
+                    }
+
+                    // initialize new root (leftNode becomes new root)
+                    ctx.interiorFrame.setPage(leftNode);
+                    ctx.interiorFrame.initBuffer((byte) (ctx.leafFrame.getLevel() + 1));
+                    ctx.interiorFrame.setSmFlag(true); // will be cleared later
+                    // in unsetSmPages
+                    ctx.splitKey.setLeftPage(newLeftId);
+                    ctx.interiorFrame.insert(ctx.splitKey.getData(), cmp);
+                } finally {
+                    newLeftNode.releaseWriteLatch();
+                    writeLatchesReleased++;
+                    bufferCache.unpin(newLeftNode);
+                    unpins++;
+                }
+            } finally {
+                rightNode.releaseWriteLatch();
+                writeLatchesReleased++;
+                bufferCache.unpin(rightNode);
+                unpins++;
+            }
+        } finally {
+            leftNode.releaseWriteLatch();
+            writeLatchesReleased++;
+            bufferCache.unpin(leftNode);
+            unpins++;
+        }
+    }
+
+    public void insert(byte[] data, IBTreeFrameLeaf leafFrame, IBTreeFrameInterior interiorFrame,
+            IBTreeFrameMeta metaFrame) throws Exception {
+        BTreeOpContext ctx = new BTreeOpContext();
+        ctx.op = BTreeOp.BTO_INSERT;
+        ctx.leafFrame = leafFrame;
+        ctx.interiorFrame = interiorFrame;
+        ctx.metaFrame = metaFrame;
+        ctx.pred = new RangePredicate(true, data, data, cmp);
+        ctx.splitKey = new SplitKey();
+        ctx.smPages = new ArrayList<Integer>();
+        ctx.pageLsns = new Stack<Integer>();
+
+        boolean repeatOp = true;
+        // we use this loop to deal with possibly multiple operation restarts
+        // due to ongoing structure modifications during the descent
+        while (repeatOp && ctx.opRestarts < MAX_RESTARTS) {
+            performOp(rootPage, null, ctx);
+
+            // if we reach this stage then we need to restart from the (possibly
+            // new) root
+            if (!ctx.pageLsns.isEmpty() && ctx.pageLsns.peek().equals(RESTART_OP)) {
+                ctx.pageLsns.pop(); // pop the restart op indicator
+                continue;
+            }
+
+            // we split the root, here is the key for a new root
+            if (ctx.splitKey.getData() != null) {
+                createNewRoot(ctx);
+            }
+
+            unsetSmPages(ctx);
+
+            repeatOp = false;
+        }
+    }
+
+    private void insertLeaf(ICachedPage node, int pageId, byte[] data, BTreeOpContext ctx) throws Exception {
+        ctx.leafFrame.setPage(node);
+        SpaceStatus spaceStatus = ctx.leafFrame.hasSpaceInsert(data, cmp);
+        switch (spaceStatus) {
+
+            case SUFFICIENT_CONTIGUOUS_SPACE: {
+                // System.out.println("INSERT LEAF");
+                ctx.leafFrame.insert(data, cmp);
+                ctx.splitKey.reset();
+            }
+                break;
+
+            case SUFFICIENT_SPACE: {
+                ctx.leafFrame.compact(cmp);
+                ctx.leafFrame.insert(data, cmp);
+                ctx.splitKey.reset();
+            }
+                break;
+
+            case INSUFFICIENT_SPACE: {
+                // try compressing the page first and see if there is space available    		
+                ctx.leafFrame.compress(cmp);
+                spaceStatus = ctx.leafFrame.hasSpaceInsert(data, cmp);
+
+                if (spaceStatus == SpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
+                    ctx.leafFrame.insert(data, cmp);
+                    ctx.splitKey.reset();
+
+                    usefulCompression++;
+                } else {
+                    uselessCompression++;
+
+                    // perform split
+                    splitsByLevel[0]++; // debug
+                    int rightSiblingPageId = ctx.leafFrame.getNextLeaf();
+                    ICachedPage rightSibling = null;
+                    if (rightSiblingPageId > 0) {
+                        rightSibling = bufferCache.pin(FileInfo.getDiskPageId(fileId, rightSiblingPageId), false);
+                        pins++;
+                    }
+
+                    treeLatch.writeLock().lock(); // lock is released in
+                    // unsetSmPages(), after sm has
+                    // fully completed
+                    treeLatchesAcquired++;
+                    try {
+
+                        int rightPageId = getFreePage(ctx.metaFrame);
+                        ICachedPage rightNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, rightPageId), true);
+                        pins++;
+                        rightNode.acquireWriteLatch();
+                        writeLatchesAcquired++;
+                        try {
+                            ISlotManager rightSlotManager = leafSlotManagerFactory.getSlotManager();
+                            IBTreeFrameLeaf rightFrame = leafFrameFactory.getFrame();
+                            rightFrame.setPage(rightNode);
+                            rightFrame.initBuffer((byte) 0);
+
+                            int ret = ctx.leafFrame.split(rightFrame, data, cmp, ctx.splitKey);
+
+                            ctx.smPages.add(pageId);
+                            ctx.smPages.add(rightPageId);
+                            ctx.leafFrame.setSmFlag(true);
+                            rightFrame.setSmFlag(true);
+
+                            rightFrame.setNextLeaf(ctx.leafFrame.getNextLeaf());
+                            rightFrame.setPrevLeaf(pageId);
+                            ctx.leafFrame.setNextLeaf(rightPageId);
+
+                            // TODO: we just use increasing numbers as pageLsn, we
+                            // should tie this together with the LogManager and
+                            // TransactionManager
+                            rightFrame.setPageLsn(rightFrame.getPageLsn() + 1);
+                            ctx.leafFrame.setPageLsn(ctx.leafFrame.getPageLsn() + 1);
+
+                            if (ret != 0) {
+                                ctx.splitKey.reset();
+                            } else {
+                                // System.out.print("LEAF SPLITKEY: ");
+                                // cmp.printKey(splitKey.getData(), 0);
+                                // System.out.println("");
+
+                                ctx.splitKey.setPages(pageId, rightPageId);
+                            }
+                            if (rightSibling != null) {
+                                rightSibling.acquireWriteLatch();
+                                writeLatchesAcquired++;
+                                try {
+                                    rightFrame.setPage(rightSibling); // reuse
+                                    // rightFrame
+                                    // for
+                                    // modification
+                                    rightFrame.setPrevLeaf(rightPageId);
+                                } finally {
+                                    rightSibling.releaseWriteLatch();
+                                    writeLatchesReleased++;
+                                }
+                            }
+                        } finally {
+                            rightNode.releaseWriteLatch();
+                            writeLatchesReleased++;
+                            bufferCache.unpin(rightNode);
+                            unpins++;
+                        }
+                    } catch (Exception e) {
+                        treeLatch.writeLock().unlock();
+                        treeLatchesReleased++;
+                        throw e;
+                    } finally {
+                        if (rightSibling != null) {
+                            bufferCache.unpin(rightSibling);
+                            unpins++;
+                        }
+                    }
+                }
+            }
+                break;
+
+        }
+
+        node.releaseWriteLatch();
+        writeLatchesReleased++;
+        bufferCache.unpin(node);
+        unpins++;
+    }
+
+    private void insertInterior(ICachedPage node, int pageId, byte[] data, BTreeOpContext ctx) throws Exception {
+        ctx.interiorFrame.setPage(node);
+        SpaceStatus spaceStatus = ctx.interiorFrame.hasSpaceInsert(data, cmp);
+        switch (spaceStatus) {
+            case INSUFFICIENT_SPACE: {
+                // System.out.println("SPLIT INTERIOR! " + pageId);
+                splitsByLevel[ctx.interiorFrame.getLevel()]++; // debug
+                int rightPageId = getFreePage(ctx.metaFrame);
+                ICachedPage rightNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, rightPageId), true);
+                pins++;
+                rightNode.acquireWriteLatch();
+                writeLatchesAcquired++;
+                try {
+                    ISlotManager rightSlotManager = interiorSlotManagerFactory.getSlotManager();
+                    IBTreeFrame rightFrame = interiorFrameFactory.getFrame();
+                    rightFrame.setPage(rightNode);
+                    rightFrame.initBuffer((byte) ctx.interiorFrame.getLevel());
+                    // instead of creating a new split key, use the existing
+                    // splitKey
+                    int ret = ctx.interiorFrame.split(rightFrame, ctx.splitKey.getData(), cmp, ctx.splitKey);
+
+                    ctx.smPages.add(pageId);
+                    ctx.smPages.add(rightPageId);
+                    ctx.interiorFrame.setSmFlag(true);
+                    rightFrame.setSmFlag(true);
+
+                    // TODO: we just use increasing numbers as pageLsn, we
+                    // should tie this together with the LogManager and
+                    // TransactionManager
+                    rightFrame.setPageLsn(rightFrame.getPageLsn() + 1);
+                    ctx.interiorFrame.setPageLsn(ctx.interiorFrame.getPageLsn() + 1);
+
+                    if (ret != 0) {
+                        ctx.splitKey.reset();
+                    } else {
+                        // System.out.print("INTERIOR SPLITKEY: ");
+                        // cmp.printKey(splitKey.getData(), 0);
+                        // System.out.println("");
+
+                        ctx.splitKey.setPages(pageId, rightPageId);
+                    }
+                } finally {
+                    rightNode.releaseWriteLatch();
+                    writeLatchesReleased++;
+                    bufferCache.unpin(rightNode);
+                    unpins++;
+                }
+            }
+                break;
+
+            case SUFFICIENT_CONTIGUOUS_SPACE: {
+                // System.out.println("INSERT INTERIOR: " + pageId);
+                ctx.interiorFrame.insert(data, cmp);
+                ctx.splitKey.reset();
+            }
+                break;
+
+            case SUFFICIENT_SPACE: {
+                ctx.interiorFrame.compact(cmp);
+                ctx.interiorFrame.insert(data, cmp);
+                ctx.splitKey.reset();
+            }
+                break;
+
+        }
+    }
+
+    public void delete(byte[] data, IBTreeFrameLeaf leafFrame, IBTreeFrameInterior interiorFrame,
+            IBTreeFrameMeta metaFrame) throws Exception {
+        BTreeOpContext ctx = new BTreeOpContext();
+        ctx.op = BTreeOp.BTO_DELETE;
+        ctx.leafFrame = leafFrame;
+        ctx.interiorFrame = interiorFrame;
+        ctx.metaFrame = metaFrame;
+        ctx.pred = new RangePredicate(true, data, data, cmp);
+        ctx.splitKey = new SplitKey();
+        ctx.smPages = new ArrayList<Integer>();
+        ctx.pageLsns = new Stack<Integer>();
+        ctx.freePages = new ArrayList<Integer>();
+
+        boolean repeatOp = true;
+        // we use this loop to deal with possibly multiple operation restarts
+        // due to ongoing structure modifications during the descent
+        while (repeatOp && ctx.opRestarts < MAX_RESTARTS) {
+            performOp(rootPage, null, ctx);
+
+            // if we reach this stage then we need to restart from the (possibly
+            // new) root
+            if (!ctx.pageLsns.isEmpty() && ctx.pageLsns.peek().equals(RESTART_OP)) {
+                ctx.pageLsns.pop(); // pop the restart op indicator
+                continue;
+            }
+
+            // tree is empty, reset level to zero
+            if (ctx.splitKey.getData() != null) {
+                ICachedPage rootNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, rootPage), false);
+                pins++;
+                rootNode.acquireWriteLatch();
+                writeLatchesAcquired++;
+                try {
+                    leafFrame.setPage(rootNode);
+                    leafFrame.initBuffer((byte) 0);
+                    currentLevel = 0; // debug
+                } finally {
+                    rootNode.releaseWriteLatch();
+                    writeLatchesReleased++;
+                    bufferCache.unpin(rootNode);
+                    unpins++;
+                }
+            }
+
+            unsetSmPages(ctx);
+
+            addFreePages(ctx);
+
+            repeatOp = false;
+        }
+    }
+
+    // TODO: to avoid latch deadlock, must modify cursor to detect empty leaves
+    private void deleteLeaf(ICachedPage node, int pageId, byte[] data, BTreeOpContext ctx) throws Exception {
+        ctx.leafFrame.setPage(node);
+
+        // will this leaf become empty?
+        if (ctx.leafFrame.getNumRecords() == 1) {
+            ISlotManager siblingSlotManager = leafSlotManagerFactory.getSlotManager();
+            IBTreeFrameLeaf siblingFrame = leafFrameFactory.getFrame();
+
+            ICachedPage leftNode = null;
+            ICachedPage rightNode = null;
+            int nextLeaf = ctx.leafFrame.getNextLeaf();
+            int prevLeaf = ctx.leafFrame.getPrevLeaf();
+
+            if (prevLeaf > 0)
+                leftNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, prevLeaf), false);
+
+            try {
+
+                if (nextLeaf > 0)
+                    rightNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, nextLeaf), false);
+
+                try {
+                    treeLatch.writeLock().lock();
+                    treeLatchesAcquired++;
+
+                    try {
+                        ctx.leafFrame.delete(data, cmp, true);
+                        // to propagate the deletion we only need to make the
+                        // splitKey != null
+                        // we can reuse data to identify which key to delete in
+                        // the parent
+                        ctx.splitKey.initData(1);
+                    } catch (Exception e) {
+                        // don't propagate deletion upwards if deletion at this
+                        // level fails
+                        ctx.splitKey.reset();
+                        throw e;
+                    }
+
+                    ctx.leafFrame.setPageLsn(ctx.leafFrame.getPageLsn() + 1); // TODO:
+                    // tie
+                    // together
+                    // with
+                    // logging
+
+                    ctx.smPages.add(pageId);
+                    ctx.leafFrame.setSmFlag(true);
+
+                    node.releaseWriteLatch();
+                    writeLatchesReleased++;
+                    bufferCache.unpin(node);
+                    unpins++;
+
+                    if (leftNode != null) {
+                        leftNode.acquireWriteLatch();
+                        try {
+                            siblingFrame.setPage(leftNode);
+                            siblingFrame.setNextLeaf(nextLeaf);
+                            siblingFrame.setPageLsn(siblingFrame.getPageLsn() + 1); // TODO:
+                            // tie
+                            // together
+                            // with
+                            // logging
+                        } finally {
+                            leftNode.releaseWriteLatch();
+                        }
+                    }
+
+                    if (rightNode != null) {
+                        rightNode.acquireWriteLatch();
+                        try {
+                            siblingFrame.setPage(rightNode);
+                            siblingFrame.setPrevLeaf(prevLeaf);
+                            siblingFrame.setPageLsn(siblingFrame.getPageLsn() + 1); // TODO:
+                            // tie
+                            // together
+                            // with
+                            // logging
+                        } finally {
+                            rightNode.releaseWriteLatch();
+                        }
+                    }
+
+                    // register pageId as a free
+                    ctx.freePages.add(pageId);
+
+                } catch (Exception e) {
+                    treeLatch.writeLock().unlock();
+                    treeLatchesReleased++;
+                    throw e;
+                } finally {
+                    if (rightNode != null) {
+                        bufferCache.unpin(rightNode);
+                    }
+                }
+            } finally {
+                if (leftNode != null) {
+                    bufferCache.unpin(leftNode);
+                }
+            }
+        } else { // leaf will not become empty
+            ctx.leafFrame.delete(data, cmp, true);
+            node.releaseWriteLatch();
+            writeLatchesReleased++;
+            bufferCache.unpin(node);
+            unpins++;
+        }
+    }
+
+    private void deleteInterior(ICachedPage node, int pageId, byte[] data, BTreeOpContext ctx) throws Exception {
+        ctx.interiorFrame.setPage(node);
+
+        // this means there is only a child pointer but no key, this case
+        // propagates the split
+        if (ctx.interiorFrame.getNumRecords() == 0) {
+            ctx.interiorFrame.setPageLsn(ctx.interiorFrame.getPageLsn() + 1); // TODO:
+            // tie
+            // together
+            // with
+            // logging
+            ctx.smPages.add(pageId);
+            ctx.interiorFrame.setSmFlag(true);
+            ctx.interiorFrame.setRightmostChildPageId(-1); // this node is
+            // completely empty
+            // register this pageId as a free page
+            ctx.freePages.add(pageId);
+
+        } else {
+            ctx.interiorFrame.delete(data, cmp, false);
+            ctx.interiorFrame.setPageLsn(ctx.interiorFrame.getPageLsn() + 1); // TODO:
+            // tie
+            // together
+            // with
+            // logging
+            ctx.splitKey.reset(); // don't propagate deletion
+        }
+    }
+
+    private final void acquireLatch(ICachedPage node, BTreeOp op, boolean isLeaf) {
+        if (isLeaf && (op.equals(BTreeOp.BTO_INSERT) || op.equals(BTreeOp.BTO_DELETE))) {
+            node.acquireWriteLatch();
+            writeLatchesAcquired++;
+        } else {
+            node.acquireReadLatch();
+            readLatchesAcquired++;
+        }
+    }
+
+    private final void releaseLatch(ICachedPage node, BTreeOp op, boolean isLeaf) {
+        if (isLeaf && (op.equals(BTreeOp.BTO_INSERT) || op.equals(BTreeOp.BTO_DELETE))) {
+            node.releaseWriteLatch();
+            writeLatchesReleased++;
+        } else {
+            node.releaseReadLatch();
+            readLatchesReleased++;
+        }
+    }
+
+    private boolean isConsistent(int pageId, BTreeOpContext ctx) throws Exception {
+        ICachedPage node = bufferCache.pin(FileInfo.getDiskPageId(fileId, pageId), false);
+        pins++;
+        node.acquireReadLatch();
+        readLatchesAcquired++;
+        ctx.interiorFrame.setPage(node);
+        boolean isConsistent = false;
+        try {
+            isConsistent = ctx.pageLsns.peek().equals(ctx.interiorFrame.getPageLsn());
+        } finally {
+            node.releaseReadLatch();
+            readLatchesReleased++;
+            bufferCache.unpin(node);
+            unpins++;
+        }
+        return isConsistent;
+    }
+
+    private void performOp(int pageId, ICachedPage parent, BTreeOpContext ctx) throws Exception {
+        ICachedPage node = bufferCache.pin(FileInfo.getDiskPageId(fileId, pageId), false);
+        pins++;
+
+        ctx.interiorFrame.setPage(node);
+        boolean isLeaf = ctx.interiorFrame.isLeaf();
+        acquireLatch(node, ctx.op, isLeaf);
+        boolean smFlag = ctx.interiorFrame.getSmFlag();
+
+        // remember trail of pageLsns, to unwind recursion in case of an ongoing
+        // structure modification
+        ctx.pageLsns.push(ctx.interiorFrame.getPageLsn());
+
+        try {
+
+            // latch coupling, note: parent should never be write latched,
+            // otherwise something is wrong.
+            if (parent != null) {
+                parent.releaseReadLatch();
+                readLatchesReleased++;
+                bufferCache.unpin(parent);
+                unpins++;
+            }
+
+            if (!isLeaf || smFlag) {
+                if (!smFlag) {
+                    // we use this loop to deal with possibly multiple operation
+                    // restarts due to ongoing structure modifications during
+                    // the descent
+                    boolean repeatOp = true;
+                    while (repeatOp && ctx.opRestarts < MAX_RESTARTS) {
+                        int childPageId = ctx.interiorFrame.getChildPageId(ctx.pred, cmp);
+                        performOp(childPageId, node, ctx);
+
+                        if (!ctx.pageLsns.isEmpty() && ctx.pageLsns.peek().equals(RESTART_OP)) {
+                            ctx.pageLsns.pop(); // pop the restart op indicator
+                            if (isConsistent(pageId, ctx)) {
+                                node = null; // to avoid unpinning and
+                                // unlatching node again in
+                                // recursive call
+                                continue; // descend the tree again
+                            } else {
+                                ctx.pageLsns.pop(); // pop pageLsn of this page
+                                // (version seen by this op
+                                // during descent)
+                                ctx.pageLsns.push(RESTART_OP); // this node is
+                                // not
+                                // consistent,
+                                // set the
+                                // restart
+                                // indicator for
+                                // upper level
+                                break;
+                            }
+                        }
+
+                        switch (ctx.op) {
+
+                            case BTO_INSERT: {
+                                if (ctx.splitKey.getData() != null) {
+                                    node = bufferCache.pin(FileInfo.getDiskPageId(fileId, pageId), false);
+                                    pins++;
+                                    node.acquireWriteLatch();
+                                    writeLatchesAcquired++;
+                                    try {
+                                        insertInterior(node, pageId, ctx.splitKey.getData(), ctx);
+                                    } finally {
+                                        node.releaseWriteLatch();
+                                        writeLatchesReleased++;
+                                        bufferCache.unpin(node);
+                                        unpins++;
+                                    }
+                                } else {
+                                    unsetSmPages(ctx);
+                                }
+                            }
+                                break;
+
+                            case BTO_DELETE: {
+                                if (ctx.splitKey.getData() != null) {
+                                    node = bufferCache.pin(FileInfo.getDiskPageId(fileId, pageId), false);
+                                    pins++;
+                                    node.acquireWriteLatch();
+                                    writeLatchesAcquired++;
+                                    try {
+                                        deleteInterior(node, pageId, ctx.pred.getLowKeys(), ctx);
+                                    } finally {
+                                        node.releaseWriteLatch();
+                                        writeLatchesReleased++;
+                                        bufferCache.unpin(node);
+                                        unpins++;
+                                    }
+                                } else {
+                                    unsetSmPages(ctx);
+                                }
+                            }
+                                break;
+
+                            case BTO_SEARCH: {
+                                // do nothing
+                            }
+                                break;
+
+                        }
+
+                        repeatOp = false; // operation completed
+
+                    } // end while
+                } else { // smFlag
+                    ctx.opRestarts++;
+                    System.out.println("ONGOING SM ON PAGE " + pageId + " AT LEVEL " + ctx.interiorFrame.getLevel()
+                            + ", RESTARTS: " + ctx.opRestarts);
+                    releaseLatch(node, ctx.op, isLeaf);
+                    bufferCache.unpin(node);
+                    unpins++;
+
+                    // TODO: this should be an instant duration lock, how to do
+                    // this in java?
+                    // instead we just immediately release the lock. this is
+                    // inefficient but still correct and will not cause
+                    // latch-deadlock
+                    treeLatch.readLock().lock();
+                    treeLatch.readLock().unlock();
+
+                    // unwind recursion and restart operation, find lowest page
+                    // with a pageLsn as seen by this operation during descent
+                    ctx.pageLsns.pop(); // pop current page lsn
+                    // put special value on the stack to inform caller of
+                    // restart
+                    ctx.pageLsns.push(RESTART_OP);
+                }
+            } else { // isLeaf and !smFlag
+                switch (ctx.op) {
+                    case BTO_INSERT: {
+                        insertLeaf(node, pageId, ctx.pred.getLowKeys(), ctx);
+                    }
+                        break;
+
+                    case BTO_DELETE: {
+                        deleteLeaf(node, pageId, ctx.pred.getLowKeys(), ctx);
+                    }
+                        break;
+
+                    case BTO_SEARCH: {
+                        ctx.cursor.open(node, ctx.pred);
+                    }
+                        break;
+
+                }
+            }
+        } catch (BTreeException e) {
+            // System.out.println("BTREE EXCEPTION");
+            // System.out.println(e.getMessage());
+            // e.printStackTrace();
+            if (!e.getHandled()) {
+                releaseLatch(node, ctx.op, isLeaf);
+                bufferCache.unpin(node);
+                unpins++;
+                e.setHandled(true);
+            }
+            throw e;
+        } catch (Exception e) { // this could be caused, e.g. by a
+            // failure to pin a new node during a
+            // split
+            System.out.println("ASTERIX EXCEPTION");
+            releaseLatch(node, ctx.op, isLeaf);
+            bufferCache.unpin(node);
+            unpins++;
+            BTreeException propException = new BTreeException(e.getMessage());
+            propException.setHandled(true); // propagate a BTreeException,
+            // indicating that the parent node
+            // must not be unlatched and
+            // unpinned
+            throw propException;
+        }
+    }
+
+    private boolean bulkNewPage = false;
+
+    public final class BulkLoadContext {
+        public int slotSize;
+        public int leafMaxBytes;
+        public int interiorMaxBytes;
+        public SplitKey splitKey = new SplitKey();
+        ArrayList<NodeFrontier> nodeFrontiers = new ArrayList<NodeFrontier>(); // we
+        // maintain
+        // a
+        // frontier
+        // of
+        // nodes
+        // for
+        // each
+        // level
+        IBTreeFrameLeaf leafFrame;
+        IBTreeFrameInterior interiorFrame;
+        IBTreeFrameMeta metaFrame;
+
+        public BulkLoadContext(float fillFactor, IBTreeFrameLeaf leafFrame, IBTreeFrameInterior interiorFrame,
+                IBTreeFrameMeta metaFrame) throws Exception {
+            NodeFrontier leafFrontier = new NodeFrontier();
+            leafFrontier.pageId = getFreePage(metaFrame);
+            leafFrontier.page = bufferCache.pin(FileInfo.getDiskPageId(fileId, leafFrontier.pageId), bulkNewPage);
+            leafFrontier.page.acquireWriteLatch();
+
+            interiorFrame.setPage(leafFrontier.page);
+            interiorFrame.initBuffer((byte) 0);
+            interiorMaxBytes = (int) ((float) interiorFrame.getTotalFreeSpace() * fillFactor);
+
+            leafFrame.setPage(leafFrontier.page);
+            leafFrame.initBuffer((byte) 0);
+            leafMaxBytes = (int) ((float) leafFrame.getTotalFreeSpace() * fillFactor);
+
+            slotSize = leafFrame.getSlotManager().getSlotSize();
+
+            this.leafFrame = leafFrame;
+            this.interiorFrame = interiorFrame;
+            this.metaFrame = metaFrame;
+
+            nodeFrontiers.add(leafFrontier);
+        }
+
+        private void addLevel() throws Exception {
+            NodeFrontier frontier = new NodeFrontier();
+            frontier.bytesInserted = 0;
+            frontier.pageId = getFreePage(metaFrame);
+            frontier.page = bufferCache.pin(FileInfo.getDiskPageId(fileId, frontier.pageId), bulkNewPage);
+            frontier.page.acquireWriteLatch();
+            interiorFrame.setPage(frontier.page);
+            interiorFrame.initBuffer((byte) nodeFrontiers.size());
+            nodeFrontiers.add(frontier);
+        }
+    }
+
+    private void propagateBulk(BulkLoadContext ctx, int level) throws Exception {
+
+        if (ctx.splitKey.getData() == null)
+            return;
+
+        if (level >= ctx.nodeFrontiers.size())
+            ctx.addLevel();
+
+        NodeFrontier frontier = ctx.nodeFrontiers.get(level);
+        ctx.interiorFrame.setPage(frontier.page);
+        byte[] insBytes = ctx.splitKey.getData();
+        int keySize = cmp.getKeySize(insBytes, 0);
+        int spaceNeeded = keySize + ctx.slotSize + 4; // TODO: this is a dirty
+        // constant for the size
+        // of a child pointer
+        if (frontier.bytesInserted + spaceNeeded > ctx.interiorMaxBytes) {
+            ctx.interiorFrame.deleteGreatest(cmp);
+            int splitKeySize = cmp.getKeySize(frontier.lastRecord, 0);
+            ctx.splitKey.initData(splitKeySize);
+            System.arraycopy(frontier.lastRecord, 0, ctx.splitKey.getData(), 0, splitKeySize);
+            ctx.splitKey.setLeftPage(frontier.pageId);
+
+            frontier.page.releaseWriteLatch();
+            bufferCache.unpin(frontier.page);
+            frontier.pageId = getFreePage(ctx.metaFrame);
+
+            ctx.splitKey.setRightPage(frontier.pageId);
+            propagateBulk(ctx, level + 1);
+
+            frontier.page = bufferCache.pin(FileInfo.getDiskPageId(fileId, frontier.pageId), bulkNewPage);
+            frontier.page.acquireWriteLatch();
+            ctx.interiorFrame.setPage(frontier.page);
+            ctx.interiorFrame.initBuffer((byte) level);
+            frontier.bytesInserted = 0;
+        }
+        ctx.interiorFrame.insertSorted(insBytes, cmp);
+        frontier.lastRecord = insBytes;
+        frontier.bytesInserted += spaceNeeded;
+    }
+
+    // assumes btree has been created and opened
+    public BulkLoadContext beginBulkLoad(float fillFactor, IBTreeFrameLeaf leafFrame,
+            IBTreeFrameInterior interiorFrame, IBTreeFrameMeta metaFrame) throws Exception {
+        BulkLoadContext ctx = new BulkLoadContext(fillFactor, leafFrame, interiorFrame, metaFrame);
+        return ctx;
+    }
+
+    public void bulkLoadAddRecord(BulkLoadContext ctx, byte[] record) throws Exception {
+        NodeFrontier leafFrontier = ctx.nodeFrontiers.get(0);
+        IBTreeFrameLeaf leafFrame = ctx.leafFrame;
+
+        int spaceNeeded = record.length + ctx.slotSize;
+        if (leafFrontier.bytesInserted + spaceNeeded > ctx.leafMaxBytes) {
+            int splitKeySize = cmp.getKeySize(leafFrontier.lastRecord, 0);
+            ctx.splitKey.initData(splitKeySize);
+            System.arraycopy(leafFrontier.lastRecord, 0, ctx.splitKey.getData(), 0, splitKeySize);
+            ctx.splitKey.setLeftPage(leafFrontier.pageId);
+            int prevPageId = leafFrontier.pageId;
+            leafFrontier.pageId = getFreePage(ctx.metaFrame);
+            leafFrame.setNextLeaf(leafFrontier.pageId);
+            leafFrontier.page.releaseWriteLatch();
+            bufferCache.unpin(leafFrontier.page);
+
+            ctx.splitKey.setRightPage(leafFrontier.pageId);
+            propagateBulk(ctx, 1);
+
+            leafFrontier.page = bufferCache.pin(FileInfo.getDiskPageId(fileId, leafFrontier.pageId), bulkNewPage);
+            leafFrontier.page.acquireWriteLatch();
+            leafFrame.setPage(leafFrontier.page);
+            leafFrame.initBuffer((byte) 0);
+            leafFrame.setPrevLeaf(prevPageId);
+            leafFrontier.bytesInserted = 0;
+        }
+
+        leafFrame.setPage(leafFrontier.page);
+        leafFrame.insertSorted(record, cmp);
+        leafFrontier.lastRecord = record;
+        leafFrontier.bytesInserted += spaceNeeded;
+    }
+
+    public void endBulkLoad(BulkLoadContext ctx) throws Exception {
+        // copy root
+        ICachedPage rootNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, rootPage), bulkNewPage);
+        rootNode.acquireWriteLatch();
+        try {
+            ICachedPage toBeRoot = ctx.nodeFrontiers.get(ctx.nodeFrontiers.size() - 1).page;
+            System.arraycopy(toBeRoot.getBuffer().array(), 0, rootNode.getBuffer().array(), 0, toBeRoot.getBuffer()
+                    .capacity());
+        } finally {
+            rootNode.releaseWriteLatch();
+            bufferCache.unpin(rootNode);
+
+            // cleanup
+            for (int i = 0; i < ctx.nodeFrontiers.size(); i++) {
+                ctx.nodeFrontiers.get(i).page.releaseWriteLatch();
+                bufferCache.unpin(ctx.nodeFrontiers.get(i).page);
+            }
+        }
+        // debug
+        currentLevel = (byte) ctx.nodeFrontiers.size();
+    }
+
+    public IBTreeFrameInteriorFactory getInteriorFrameFactory() {
+        return interiorFrameFactory;
+    }
+
+    public IBTreeFrameLeafFactory getLeafFrameFactory() {
+        return leafFrameFactory;
+    }
+
+    public ISlotManagerFactory getInteriorSlotManagerFactory() {
+        return interiorSlotManagerFactory;
+    }
+
+    public ISlotManagerFactory getLeafSlotManagerFactory() {
+        return leafSlotManagerFactory;
+    }
+
+    public MultiComparator getMultiComparator() {
+        return cmp;
+    }
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeDiskOrderScanCursor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeDiskOrderScanCursor.java
new file mode 100644
index 0000000..47f43ab
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeDiskOrderScanCursor.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeCursor;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.FileInfo;
+
+public class BTreeDiskOrderScanCursor implements IBTreeCursor {
+
+    // TODO: might want to return records in physical order, not logical order to speed up access
+
+    private int recordNum = 0;
+    private int recordOffset = -1;
+    private int fileId = -1;
+    int currentPageId = -1;
+    int maxPageId = -1; // TODO: figure out how to scan to the end of file, this is dirty and may not with concurrent updates
+    private ICachedPage page = null;
+    private IBTreeFrameLeaf frame = null;
+    private IBufferCache bufferCache = null;
+
+    public BTreeDiskOrderScanCursor(IBTreeFrameLeaf frame) {
+        this.frame = frame;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        page.releaseReadLatch();
+        bufferCache.unpin(page);
+        page = null;
+    }
+
+    @Override
+    public int getOffset() {
+        return recordOffset;
+    }
+
+    @Override
+    public ICachedPage getPage() {
+        return page;
+    }
+
+    private boolean positionToNextLeaf(boolean skipCurrent) throws HyracksDataException {
+        while ((frame.getLevel() != 0 || skipCurrent) && currentPageId <= maxPageId) {
+            currentPageId++;
+
+            ICachedPage nextPage = bufferCache.pin(FileInfo.getDiskPageId(fileId, currentPageId), false);
+            nextPage.acquireReadLatch();
+
+            page.releaseReadLatch();
+            bufferCache.unpin(page);
+
+            page = nextPage;
+            frame.setPage(page);
+            recordNum = 0;
+            skipCurrent = false;
+        }
+        if (currentPageId <= maxPageId)
+            return true;
+        else
+            return false;
+    }
+
+    @Override
+    public boolean hasNext() throws HyracksDataException {
+        if (recordNum >= frame.getNumRecords()) {
+            boolean nextLeafExists = positionToNextLeaf(true);
+            if (nextLeafExists) {
+                recordOffset = frame.getRecordOffset(recordNum);
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        recordOffset = frame.getRecordOffset(recordNum);
+        return true;
+    }
+
+    @Override
+    public void next() throws HyracksDataException {
+        recordNum++;
+    }
+
+    @Override
+    public void open(ICachedPage page, ISearchPredicate searchPred) throws HyracksDataException {
+        // in case open is called multiple times without closing
+        if (this.page != null) {
+            this.page.releaseReadLatch();
+            bufferCache.unpin(this.page);
+        }
+
+        this.page = page;
+        recordNum = 0;
+        frame.setPage(page);
+        boolean leafExists = positionToNextLeaf(false);
+        if (!leafExists) {
+            throw new HyracksDataException(
+                    "Failed to open disk-order scan cursor for B-tree. Traget B-tree has no leaves.");
+        }
+    }
+
+    @Override
+    public void reset() {
+        recordNum = 0;
+        recordOffset = 0;
+        currentPageId = -1;
+        maxPageId = -1;
+        page = null;
+    }
+
+    @Override
+    public void setBufferCache(IBufferCache bufferCache) {
+        this.bufferCache = bufferCache;
+    }
+
+    @Override
+    public void setFileId(int fileId) {
+        this.fileId = fileId;
+    }
+
+    public void setCurrentPageId(int currentPageId) {
+        this.currentPageId = currentPageId;
+    }
+
+    public void setMaxPageId(int maxPageId) {
+        this.maxPageId = maxPageId;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeException.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeException.java
new file mode 100644
index 0000000..4e6d9d6
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+public class BTreeException extends Exception {
+	
+	private static final long serialVersionUID = 1L;
+	private boolean handled = false;
+		
+	public BTreeException(String message) {
+        super(message);
+    }
+	
+	public void setHandled(boolean handled) {
+		this.handled = handled;
+	}
+	
+	public boolean getHandled() {
+		return handled;
+	}	
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeMeta.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeMeta.java
new file mode 100644
index 0000000..696a5be
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeMeta.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameMeta;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+// all meta pages of this kind have a negative level
+// the first meta page has level -1, all other meta pages have level -2
+// the first meta page is special because it guarantees to have a correct max page
+// other meta pages (i.e., with level -2) have junk in the max page field
+
+public class BTreeMeta implements IBTreeFrameMeta {
+        
+    protected static final int numRecordsOff = 0;             
+    protected static final int freeSpaceOff = numRecordsOff + 4;
+    protected static final int maxPageOff = freeSpaceOff + 4;
+    protected static final byte levelOff = maxPageOff + 1;
+    protected static final byte nextPageOff = maxPageOff + 8;
+    
+    protected ICachedPage page = null;
+    protected ByteBuffer buf = null;    
+    
+    public int getMaxPage() {
+        return buf.getInt(maxPageOff);
+    }
+    
+    public void setMaxPage(int maxPage) {
+        buf.putInt(maxPageOff, maxPage);
+    }
+        
+    public int getFreePage() {                
+        int numRecords = buf.getInt(numRecordsOff); 
+        if(numRecords > 0) {
+            // return the last page from the linked list of free pages
+            // TODO: this is a dumb policy, but good enough for now
+            int lastPageOff = buf.getInt(freeSpaceOff) - 4;
+            buf.putInt(freeSpaceOff, lastPageOff);
+            buf.putInt(numRecordsOff, numRecords - 1);
+            return buf.getInt(lastPageOff);
+        }
+        else {
+            return -1;
+        }                                    
+    }
+    
+    
+    // must be checked before adding free page
+    // user of this class is responsible for getting a free page as a new meta page, latching it, etc. if there is no space on this page
+    public boolean hasSpace() {
+        return buf.getInt(freeSpaceOff) + 4 < buf.capacity();
+    }
+    
+    // on bounds checking is done, there must be free space
+    public void addFreePage(int freePage) {
+        int freeSpace = buf.getInt(freeSpaceOff);
+        buf.putInt(freeSpace, freePage);
+        buf.putInt(freeSpaceOff, freeSpace + 4);
+        buf.putInt(numRecordsOff, buf.getInt(numRecordsOff) + 1);
+    }
+
+    @Override
+    public byte getLevel() {        
+        return buf.get(levelOff);
+    }
+    
+    @Override
+    public void setLevel(byte level) {
+        buf.put(levelOff, level);           
+    }
+    
+    @Override
+    public ICachedPage getPage() {
+        return page;
+    }   
+
+    @Override
+    public void setPage(ICachedPage page) {
+        this.page = page;
+        this.buf = page.getBuffer();        
+    }
+
+    @Override
+    public void initBuffer(int level) {
+        buf.putInt(freeSpaceOff, nextPageOff + 4);
+        buf.putInt(numRecordsOff, 0);
+        buf.putInt(levelOff, level);
+        buf.putInt(nextPageOff, -1);
+    }
+    
+    @Override
+    public int getNextPage() {
+        return buf.getInt(nextPageOff);
+    }
+
+    @Override
+    public void setNextPage(int nextPage) {
+        buf.putInt(nextPageOff, nextPage);
+    }        
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeMetaFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeMetaFactory.java
new file mode 100644
index 0000000..88d8bc9
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeMetaFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameMeta;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameMetaFactory;
+
+public class BTreeMetaFactory implements IBTreeFrameMetaFactory {
+    @Override
+    public IBTreeFrameMeta getFrame() {     
+        return new BTreeMeta();
+    }   
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSM.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSM.java
new file mode 100644
index 0000000..77dbd62
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSM.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrame;
+
+public abstract class BTreeNSM extends NSMFrame implements IBTreeFrame {
+	
+	protected static final byte levelOff = totalFreeSpaceOff + 4; 	
+	protected static final byte smFlagOff = levelOff + 1;
+	
+	public BTreeNSM() {
+		super();
+	}
+	
+	@Override
+	public void initBuffer(byte level) {
+		super.initBuffer(level);
+		buf.put(levelOff, level);
+		buf.put(smFlagOff, (byte)0);
+	}
+		
+	@Override
+	public boolean isLeaf() {
+		return buf.get(levelOff) == 0;
+	}
+			
+	@Override
+	public byte getLevel() {		
+		return buf.get(levelOff);
+	}
+	
+	@Override
+	public void setLevel(byte level) {
+		buf.put(levelOff, level);				
+	}
+	
+	@Override
+	public boolean getSmFlag() {
+		return buf.get(smFlagOff) != 0;
+	}
+
+	@Override
+	public void setSmFlag(boolean smFlag) {
+		if(smFlag)			
+			buf.put(smFlagOff, (byte)1);		
+		else
+			buf.put(smFlagOff, (byte)0);			
+	}		
+	
+	// TODO: remove
+	@Override
+	public int getFreeSpaceOff() {
+		return buf.getInt(freeSpaceOff);
+	}
+
+	@Override
+	public void setFreeSpaceOff(int freeSpace) {
+		buf.putInt(freeSpaceOff, freeSpace);		
+	}
+	
+	@Override
+    public void compress(MultiComparator cmp) {        
+    }
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMInterior.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMInterior.java
new file mode 100644
index 0000000..18d53f8
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMInterior.java
@@ -0,0 +1,335 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrame;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameInterior;
+
+public class BTreeNSMInterior extends BTreeNSM implements IBTreeFrameInterior {
+		
+	private static final int rightLeafOff = smFlagOff + 1;
+	
+	private static final int childPtrSize = 4;
+	
+	public BTreeNSMInterior() {
+		super();
+	}
+	
+	private int getLeftChildPageOff(int recOff, MultiComparator cmp) {
+		for(int i = 0; i < cmp.size(); i++) {
+			recOff += cmp.getFields()[i].getLength(buf.array(), recOff);
+		}		
+		return recOff;
+	}	
+	
+	@Override
+	public void initBuffer(byte level) {
+		super.initBuffer(level);
+		buf.putInt(rightLeafOff, -1);
+	}
+	
+	@Override
+	public void insert(byte[] data, MultiComparator cmp) throws Exception {
+		
+		int slotOff = slotManager.findSlot(buf, data, cmp, false);
+		boolean isDuplicate = true;
+		
+		if(slotOff < 0) isDuplicate = false; // greater than all existing keys
+		else if(cmp.compare(data, 0, buf.array(), slotManager.getRecOff(slotOff)) != 0) isDuplicate = false;
+		
+		if(isDuplicate) {			
+			throw new BTreeException("Trying to insert duplicate value into interior node.");
+		}
+		else {			
+			slotOff = slotManager.insertSlot(slotOff, buf.getInt(freeSpaceOff));			
+						
+			int recOff = buf.getInt(freeSpaceOff);
+			int recSize = cmp.getKeySize(data, 0) + childPtrSize;
+			System.arraycopy(data, 0, buf.array(), recOff, recSize);		
+			
+			buf.putInt(numRecordsOff, buf.getInt(numRecordsOff) + 1);
+			buf.putInt(freeSpaceOff, buf.getInt(freeSpaceOff) + recSize);
+			buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) - recSize - slotManager.getSlotSize());
+			
+			// did insert into the rightmost slot?
+			if(slotOff == slotManager.getSlotStartOff()) { 
+				System.arraycopy(data, recSize, buf.array(), rightLeafOff, childPtrSize);
+			}
+			else {
+				// if slotOff has a right (slot-)neighbor then update its child pointer
+				// the only time when this is NOT the case, is when this is the first record 
+				// (or when the splitkey goes into the rightmost slot but that case was handled in the if above)
+				
+				if(buf.getInt(numRecordsOff) > 1) {					
+					int rightNeighborOff = slotOff - slotManager.getSlotSize();
+					int keySize = cmp.getKeySize(buf.array(), slotManager.getRecOff(rightNeighborOff));
+					System.arraycopy(data, recSize, buf.array(), slotManager.getRecOff(rightNeighborOff) + keySize, childPtrSize);
+				}				
+			}			
+		}
+		
+		//System.out.println("INSSPACEA: " + (buf.capacity() - buf.getInt(freeSpaceOff) - (buf.getInt(numRecordsOff) * slotManager.getSlotSize())));
+		//System.out.println("INSSPACEB: " + (buf.getInt(totalFreeSpaceOff)));
+	}
+	
+	@Override
+	public void insertSorted(byte[] data, MultiComparator cmp) throws Exception {
+		int recOff = buf.getInt(freeSpaceOff);
+		slotManager.insertSlot(-1, buf.getInt(freeSpaceOff));
+		int recSize = cmp.getKeySize(data, 0) + childPtrSize;
+		System.arraycopy(data, 0, buf.array(), recOff, recSize);
+		buf.putInt(numRecordsOff, buf.getInt(numRecordsOff) + 1);
+		buf.putInt(freeSpaceOff, buf.getInt(freeSpaceOff) + recSize);
+		buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) - recSize - slotManager.getSlotSize());
+		System.arraycopy(data, recSize, buf.array(), rightLeafOff, childPtrSize);			
+	}
+		
+	@Override
+	public int split(IBTreeFrame rightFrame, byte[] data, MultiComparator cmp, SplitKey splitKey) throws Exception {		
+		// before doing anything check if key already exists
+		int slotOff = slotManager.findSlot(buf, data, cmp, true);
+		if(slotOff >= 0) {
+			if(cmp.compare(data, 0, buf.array(), slotManager.getRecOff(slotOff)) == 0) {				
+				throw new BTreeException("Inserting duplicate key in interior node during split");				
+			}
+		}
+		
+		ByteBuffer right = rightFrame.getBuffer();
+		int numRecords = buf.getInt(numRecordsOff);
+		
+		int recordsToLeft = (numRecords / 2) + (numRecords % 2);
+		IBTreeFrame targetFrame = null;
+		if(cmp.compare(data, 0, buf.array(), getRecordOffset(recordsToLeft-1)) <= 0) {
+			targetFrame = this;
+		}
+		else {
+			targetFrame = rightFrame;
+		}			
+		int recordsToRight = numRecords - recordsToLeft;		
+				
+		// copy entire page
+		System.arraycopy(buf.array(), 0, right.array(), 0, buf.capacity());
+		
+		// on right page we need to copy rightmost slots to left		
+		int src = rightFrame.getSlotManager().getSlotStartOff();
+		int dest = rightFrame.getSlotManager().getSlotStartOff() + recordsToLeft * rightFrame.getSlotManager().getSlotSize();
+		int length = rightFrame.getSlotManager().getSlotSize() * recordsToRight;
+		System.arraycopy(right.array(), src, right.array(), dest, length);				
+		right.putInt(numRecordsOff, recordsToRight);
+				
+		// on left page, remove highest key and make its childpointer the rightmost childpointer
+		buf.putInt(numRecordsOff, recordsToLeft);
+				
+		// copy data to be inserted, we need this because creating the splitkey will overwrite the data param (data points to same memory as splitKey.getData())
+		byte[] savedData = new byte[data.length];
+		System.arraycopy(data, 0, savedData, 0, data.length);
+		
+		// set split key to be highest value in left page	
+		int recOff = slotManager.getRecOff(slotManager.getSlotStartOff());
+		int splitKeySize = cmp.getKeySize(buf.array(), recOff);
+		splitKey.initData(splitKeySize);
+		System.arraycopy(buf.array(), recOff, splitKey.getData(), 0, splitKeySize);
+		
+		int deleteRecOff = slotManager.getRecOff(slotManager.getSlotStartOff());
+		int deleteKeySize = cmp.getKeySize(buf.array(), deleteRecOff); 		
+		buf.putInt(rightLeafOff, buf.getInt(deleteRecOff + deleteKeySize));
+		buf.putInt(numRecordsOff, recordsToLeft - 1);
+				
+		// compact both pages
+		rightFrame.compact(cmp);
+		compact(cmp);
+			
+		// insert key
+		targetFrame.insert(savedData, cmp);
+			
+		return 0;
+	}	
+	
+	@Override
+	public void compact(MultiComparator cmp) {		
+		resetSpaceParams();		
+		
+		int numRecords = buf.getInt(numRecordsOff);
+		int freeSpace = buf.getInt(freeSpaceOff);
+		byte[] data = buf.array();
+		
+		ArrayList<SlotOffRecOff> sortedRecOffs = new ArrayList<SlotOffRecOff>();
+		sortedRecOffs.ensureCapacity(numRecords);
+		for(int i = 0; i < numRecords; i++) {			
+			int slotOff = slotManager.getSlotOff(i);
+			int recOff = slotManager.getRecOff(slotOff);
+			sortedRecOffs.add(new SlotOffRecOff(slotOff, recOff));
+		}
+		Collections.sort(sortedRecOffs);
+		
+		for(int i = 0; i < sortedRecOffs.size(); i++) {
+			int recOff = sortedRecOffs.get(i).recOff;
+			int recSize = cmp.getKeySize(data, recOff) + childPtrSize; // only difference from compact in NSMFrame
+			System.arraycopy(data, recOff, data, freeSpace, recSize);
+			slotManager.setSlot(sortedRecOffs.get(i).slotOff, freeSpace);		
+			freeSpace += recSize;
+		}
+		
+		buf.putInt(freeSpaceOff, freeSpace);
+		buf.putInt(totalFreeSpaceOff, buf.capacity() - freeSpace - numRecords * slotManager.getSlotSize());
+	}
+		
+	@Override
+	public int getChildPageId(RangePredicate pred, MultiComparator srcCmp) {				
+		// check for trivial case where there is only a child pointer (and no key)
+		if(buf.getInt(numRecordsOff) == 0) {			
+			return buf.getInt(rightLeafOff);
+		}
+		
+		// check for trivial cases where no low key or high key exists (e.g. during an index scan)
+		byte[] data = null;
+		if(pred.isForward()) {						
+			data = pred.getLowKeys();
+			if(data == null) {
+				return getLeftmostChildPageId(srcCmp);
+			}			
+		}
+		else {
+			data = pred.getHighKeys();
+			if(data == null) {
+				return getRightmostChildPageId(srcCmp);				
+			}								
+		}
+		
+		MultiComparator targetCmp = pred.getComparator();
+		int slotOff = slotManager.findSlot(buf, data, targetCmp, false);
+		if(slotOff < 0) {
+			return buf.getInt(rightLeafOff);
+		}
+		else {
+			int origRecOff = slotManager.getRecOff(slotOff);
+			int cmpRecOff = origRecOff;
+			if(pred.isForward()) {
+				int maxSlotOff = buf.capacity();
+				slotOff += slotManager.getSlotSize();		
+				while(slotOff < maxSlotOff) {
+					cmpRecOff = slotManager.getRecOff(slotOff);
+					if(targetCmp.compare(buf.array(), origRecOff, buf.array(), cmpRecOff) != 0) break;					
+					slotOff += slotManager.getSlotSize();			
+				}
+				slotOff -= slotManager.getSlotSize();
+			}
+			else {
+				int minSlotOff = slotManager.getSlotStartOff() - slotManager.getSlotSize();
+				slotOff -= slotManager.getSlotSize();
+				while(slotOff > minSlotOff) {
+					cmpRecOff = slotManager.getRecOff(slotOff);
+					if(targetCmp.compare(buf.array(), origRecOff, buf.array(), cmpRecOff) != 0) break;
+					slotOff -= slotManager.getSlotSize();										
+				}
+				slotOff += slotManager.getSlotSize();
+			}
+			
+			int childPageOff = getLeftChildPageOff(slotManager.getRecOff(slotOff), srcCmp);			
+			return buf.getInt(childPageOff);
+		}				
+	}	
+	
+	@Override
+	public void delete(byte[] data, MultiComparator cmp, boolean exactDelete) throws Exception {
+		int slotOff = slotManager.findSlot(buf, data, cmp, false);
+		int recOff;
+		int keySize;
+		
+		if(slotOff < 0) {						
+			recOff = slotManager.getRecOff(slotManager.getSlotStartOff());
+			keySize = cmp.getKeySize(buf.array(), recOff);
+			// copy new rightmost pointer
+			System.arraycopy(buf.array(), recOff + keySize, buf.array(), rightLeafOff, childPtrSize);						
+		}
+		else {						
+			recOff = slotManager.getRecOff(slotOff);
+			keySize = cmp.getKeySize(buf.array(), recOff);	
+			// perform deletion (we just do a memcpy to overwrite the slot)
+			int slotStartOff = slotManager.getSlotStartOff();
+			int length = slotOff - slotStartOff;
+			System.arraycopy(buf.array(), slotStartOff, buf.array(), slotStartOff + slotManager.getSlotSize(), length);						
+		}
+		
+		// maintain space information
+		buf.putInt(numRecordsOff, buf.getInt(numRecordsOff) - 1);
+		buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + keySize + childPtrSize + slotManager.getSlotSize());
+	}
+	
+	@Override
+	protected void resetSpaceParams() {
+		buf.putInt(freeSpaceOff, rightLeafOff + childPtrSize);
+		buf.putInt(totalFreeSpaceOff, buf.capacity() - (rightLeafOff + childPtrSize));
+	}
+		
+	@Override
+	public int getLeftmostChildPageId(MultiComparator cmp) {						
+		int recOff = slotManager.getRecOff(slotManager.getSlotEndOff());
+		int childPageOff = getLeftChildPageOff(recOff, cmp);		
+		return buf.getInt(childPageOff);
+	}
+
+	@Override
+	public int getRightmostChildPageId(MultiComparator cmp) {
+		return buf.getInt(rightLeafOff);
+	}
+
+	@Override
+	public void setRightmostChildPageId(int pageId) {
+		buf.putInt(rightLeafOff, pageId);
+	}		
+	
+	// for debugging
+	public ArrayList<Integer> getChildren(MultiComparator cmp) {		
+		ArrayList<Integer> ret = new ArrayList<Integer>();		
+		int numRecords = buf.getInt(numRecordsOff);
+		for(int i = 0; i < numRecords; i++) {
+			int recOff = slotManager.getRecOff(slotManager.getSlotOff(i));
+			int keySize = cmp.getKeySize(buf.array(), recOff);			
+			int intVal = getInt(buf.array(), recOff + keySize);			
+			ret.add(intVal);
+		}
+		if(!isLeaf()) {
+			int rightLeaf = buf.getInt(rightLeafOff);
+			if(rightLeaf > 0) ret.add(buf.getInt(rightLeafOff));
+		}
+		return ret;
+	}
+
+	@Override
+	public void deleteGreatest(MultiComparator cmp) {
+		int slotOff = slotManager.getSlotStartOff();
+		int recOff = slotManager.getRecOff(slotOff);
+		int keySize = cmp.getKeySize(buf.array(), recOff); 
+		System.arraycopy(buf.array(), recOff + keySize, buf.array(), rightLeafOff, childPtrSize);
+				
+		// maintain space information
+		buf.putInt(numRecordsOff, buf.getInt(numRecordsOff) - 1);
+		buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + keySize + childPtrSize + slotManager.getSlotSize());
+		
+		int freeSpace = buf.getInt(freeSpaceOff);
+		if(freeSpace == recOff + keySize + childPtrSize) {
+			buf.putInt(freeSpace, freeSpace - (keySize + childPtrSize));
+		}		
+	}		
+	
+	private int getInt(byte[] bytes, int offset) {
+		return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8) + ((bytes[offset + 3] & 0xff) << 0);
+	}
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMInteriorFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMInteriorFactory.java
new file mode 100644
index 0000000..5360f7d
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMInteriorFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameInterior;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameInteriorFactory;
+
+public class BTreeNSMInteriorFactory implements IBTreeFrameInteriorFactory {
+	@Override
+	public IBTreeFrameInterior getFrame() {		
+		return new BTreeNSMInterior();
+	}	
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMLeaf.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMLeaf.java
new file mode 100644
index 0000000..852dc31
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMLeaf.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrame;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameLeaf;
+
+public class BTreeNSMLeaf extends BTreeNSM implements IBTreeFrameLeaf {    
+	protected static final int prevLeafOff = smFlagOff + 1;
+	protected static final int nextLeafOff = prevLeafOff + 4;
+	
+	public BTreeNSMLeaf() {
+		super();
+	}
+	
+	@Override
+	public void initBuffer(byte level) {
+		super.initBuffer(level);
+		buf.putInt(prevLeafOff, -1);
+		buf.putInt(nextLeafOff, -1);
+	}
+
+	@Override
+	public void setNextLeaf(int page) {
+		buf.putInt(nextLeafOff, page);
+	}
+
+	@Override
+	public void setPrevLeaf(int page) {
+		buf.putInt(prevLeafOff, page);
+	}
+
+	@Override
+	public int getNextLeaf() {
+		return buf.getInt(nextLeafOff);
+	}
+
+	@Override
+	public int getPrevLeaf() {
+		return buf.getInt(prevLeafOff);
+	}
+
+	@Override
+	public void insert(byte[] data, MultiComparator cmp) throws Exception {		
+		int slotOff = slotManager.findSlot(buf, data, cmp, false);
+		boolean isDuplicate = true;
+
+		if (slotOff < 0) isDuplicate = false; // greater than all existing keys
+		else if (cmp.compare(data, 0, buf.array(), slotManager.getRecOff(slotOff)) != 0) isDuplicate = false;
+
+		if (isDuplicate) {
+			throw new BTreeException("Trying to insert duplicate value into leaf of unique index");
+		} 
+		else {
+			slotOff = slotManager.insertSlot(slotOff, buf.getInt(freeSpaceOff));
+			
+			int recOff = buf.getInt(freeSpaceOff);
+			System.arraycopy(data, 0, buf.array(), recOff, data.length);
+
+			buf.putInt(numRecordsOff, buf.getInt(numRecordsOff) + 1);
+			buf.putInt(freeSpaceOff, buf.getInt(freeSpaceOff) + data.length);
+			buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) - data.length - slotManager.getSlotSize());
+		}
+	}
+	
+	@Override
+	public void insertSorted(byte[] data, MultiComparator cmp) throws Exception {		
+		int recOff = buf.getInt(freeSpaceOff);
+		slotManager.insertSlot(-1, recOff);
+		System.arraycopy(data, 0, buf.array(), recOff, data.length);
+		buf.putInt(numRecordsOff, buf.getInt(numRecordsOff) + 1);
+		buf.putInt(freeSpaceOff, buf.getInt(freeSpaceOff) + data.length);
+		buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) - data.length - slotManager.getSlotSize());
+	}
+
+	@Override
+	public int split(IBTreeFrame rightFrame, byte[] data, MultiComparator cmp, SplitKey splitKey) throws Exception {
+		// before doing anything check if key already exists
+		int slotOff = slotManager.findSlot(buf, data, cmp, true);
+		if (slotOff >= 0) {			
+			if (cmp.compare(data, 0, buf.array(), slotManager.getRecOff(slotOff)) == 0) {
+				throw new BTreeException("Inserting duplicate key into unique index");
+			}
+		}
+		
+		ByteBuffer right = rightFrame.getBuffer();
+		int numRecords = getNumRecords();
+
+		int recordsToLeft;
+		int mid = numRecords / 2;
+		IBTreeFrame targetFrame = null;
+		if ((cmp.compare(data, 0, buf.array(), slotManager.getRecOff(slotManager.getSlotStartOff() + slotManager.getSlotSize() * mid))) >= 0) {
+			recordsToLeft = mid + (numRecords % 2);
+			targetFrame = rightFrame;
+		} else {
+			recordsToLeft = mid;
+			targetFrame = this;
+		}
+		int recordsToRight = numRecords - recordsToLeft;
+				
+		// copy entire page
+		System.arraycopy(buf.array(), 0, right.array(), 0, buf.capacity());
+		
+		// on right page we need to copy rightmost slots to left
+		int src = rightFrame.getSlotManager().getSlotStartOff();
+		int dest = rightFrame.getSlotManager().getSlotStartOff() + recordsToLeft * rightFrame.getSlotManager().getSlotSize();
+		int length = rightFrame.getSlotManager().getSlotSize() * recordsToRight; 
+		System.arraycopy(right.array(), src, right.array(), dest, length);
+		right.putInt(numRecordsOff, recordsToRight);
+		
+		// on left page only change the numRecords indicator
+		buf.putInt(numRecordsOff, recordsToLeft);
+			
+		// compact both pages
+		rightFrame.compact(cmp);
+		compact(cmp);
+		
+		// insert last key
+		targetFrame.insert(data, cmp);			
+		
+		// set split key to be highest value in left page
+		int recOff = slotManager.getRecOff(slotManager.getSlotStartOff());
+		int keySize = cmp.getKeySize(buf.array(), recOff);				
+		splitKey.initData(keySize);
+		System.arraycopy(buf.array(), recOff, splitKey.getData(), 0, keySize);
+		
+		return 0;
+	}
+
+	@Override
+	protected void resetSpaceParams() {
+		buf.putInt(freeSpaceOff, nextLeafOff + 4);
+		buf.putInt(totalFreeSpaceOff, buf.capacity() - (nextLeafOff + 4));
+	}
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMLeafFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMLeafFactory.java
new file mode 100644
index 0000000..7691b7d
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeNSMLeafFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameLeafFactory;
+
+public class BTreeNSMLeafFactory implements IBTreeFrameLeafFactory {
+	@Override
+	public IBTreeFrameLeaf getFrame() {		
+		return new BTreeNSMLeaf();
+	}
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeOp.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeOp.java
new file mode 100644
index 0000000..27fe7cd
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeOp.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+public enum BTreeOp {
+	BTO_INSERT,
+	BTO_DELETE,
+	BTO_SEARCH	
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeOpContext.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeOpContext.java
new file mode 100644
index 0000000..9262b89
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeOpContext.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import java.util.ArrayList;
+import java.util.Stack;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeCursor;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameInterior;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameMeta;
+
+public final class BTreeOpContext {
+	public BTreeOp op;
+	public IBTreeFrameLeaf leafFrame;
+	public IBTreeFrameInterior interiorFrame;
+	public IBTreeFrameMeta metaFrame;
+	public IBTreeCursor cursor;
+	public RangePredicate pred;	
+	public SplitKey splitKey;
+	public int opRestarts = 0;
+	public ArrayList<Integer> smPages;	
+	public Stack<Integer> pageLsns;
+	public ArrayList<Integer> freePages;
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java
new file mode 100644
index 0000000..fc97363
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeRangeSearchCursor.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeCursor;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IBTreeFrameLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.FileInfo;
+
+public class BTreeRangeSearchCursor implements IBTreeCursor {
+
+	private ISearchPredicate searchPred = null;	
+	private int recordNum = 0;
+	private int recordOffset = -1;
+	private int fileId = -1;
+	private ICachedPage page = null;
+	private IBTreeFrameLeaf frame = null;
+	private IBufferCache bufferCache = null;
+	
+	public BTreeRangeSearchCursor(IBTreeFrameLeaf frame) {
+		this.frame = frame;		
+	}
+	
+	@Override
+	public void close() throws Exception {
+		page.releaseReadLatch();
+		bufferCache.unpin(page);
+		page = null;
+	}
+
+	@Override
+	public int getOffset() {
+		return recordOffset;
+	}
+
+	@Override
+	public ICachedPage getPage() {
+		return page;
+	}
+		
+	@Override
+	public boolean hasNext() throws Exception {
+		if(recordNum >= frame.getNumRecords()) {
+			int nextLeafPage = -1;
+			if(searchPred.isForward()) {
+				nextLeafPage = frame.getNextLeaf();
+			}
+			else {
+				nextLeafPage = frame.getPrevLeaf();
+			}
+						
+			if(nextLeafPage >= 0) {			
+				ICachedPage nextLeaf = bufferCache.pin(FileInfo.getDiskPageId(fileId, nextLeafPage), false);
+				nextLeaf.acquireReadLatch();
+								
+				page.releaseReadLatch();
+				bufferCache.unpin(page);
+				
+				page = nextLeaf;
+				frame.setPage(page);
+				
+				recordNum = 0;
+			}
+			else {
+				return false;
+			}
+		}
+		
+		// in any case compare current key
+		RangePredicate pred = (RangePredicate)searchPred;
+		MultiComparator cmp = pred.getComparator();
+		if(searchPred.isForward()) {
+			byte[] highKeys = pred.getHighKeys();			
+			recordOffset = frame.getRecordOffset(recordNum);
+			
+			if(highKeys == null) return true;						
+			if(cmp.compare(highKeys, 0, page.getBuffer().array(), recordOffset) < 0) {
+				return false;
+			}
+			else {
+				return true;
+			}
+		}
+		else {
+			byte[] lowKeys = pred.getLowKeys();			
+			recordOffset = frame.getRecordOffset(frame.getNumRecords() - recordNum - 1);
+			if(lowKeys == null) return true;
+			
+			if(cmp.compare(lowKeys, 0, page.getBuffer().array(), recordOffset) > 0) {
+				return false;
+			}
+			else {
+				return true;
+			}
+		}		
+	}
+
+	@Override
+	public void next() throws Exception {		
+		recordNum++;
+	}
+	
+	@Override
+	public void open(ICachedPage page, ISearchPredicate searchPred) throws Exception {		
+		// in case open is called multiple times without closing
+		if(this.page != null) {
+			this.page.releaseReadLatch();
+			bufferCache.unpin(this.page);
+		}
+		
+		this.searchPred = searchPred;
+		this.page = page;
+		frame.setPage(page);
+		
+		// position recordNum to the first appropriate key
+		// TODO: can be done more efficiently with binary search but this needs some thinking/refactoring
+		RangePredicate pred = (RangePredicate)searchPred;
+		MultiComparator cmp = pred.getComparator();
+		if(searchPred.isForward()) {
+			byte[] lowKeys = pred.getLowKeys();
+						
+			recordOffset = frame.getRecordOffset(recordNum);
+			if(lowKeys == null) return; // null means -infinity
+			
+			while(cmp.compare(lowKeys, 0, page.getBuffer().array(), recordOffset) > 0 && recordNum < frame.getNumRecords()) {				
+			    recordNum++;
+			    recordOffset = frame.getRecordOffset(recordNum);
+			}
+		}
+		else {
+			byte[] highKeys = pred.getHighKeys();
+						
+			recordOffset = frame.getRecordOffset(frame.getNumRecords() - recordNum - 1);			
+			if(highKeys != null) return; // null means +infinity
+			    
+			while(cmp.compare(highKeys, 0, page.getBuffer().array(), recordOffset) < 0 && recordNum < frame.getNumRecords()) {				
+			    recordNum++;
+			    recordOffset = frame.getRecordOffset(frame.getNumRecords() - recordNum - 1);				
+			}						
+		}
+	}
+	
+	@Override
+	public void reset() {
+		recordNum = 0;
+		recordOffset = 0;
+		page = null;	
+		searchPred = null;
+	}
+
+    @Override
+    public void setBufferCache(IBufferCache bufferCache) {
+        this.bufferCache = bufferCache;        
+    }
+
+    @Override
+    public void setFileId(int fileId) {
+        this.fileId = fileId;
+    }	
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/FieldIterator.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/FieldIterator.java
new file mode 100644
index 0000000..1c903b5
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/FieldIterator.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.frames.FieldPrefixNSMLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IFieldAccessor;
+
+//TODO: make members private, only for debugging now
+public class FieldIterator {
+    public int recSlotOff = -1;
+    public int recOff = -1;
+    public int numPrefixFields = 0;
+    public IFieldAccessor[] fields;
+    public FieldPrefixNSMLeaf frame;
+    
+    public int currentField = -1;
+    public int recOffRunner = -1;
+    
+    public FieldIterator(IFieldAccessor[] fields, FieldPrefixNSMLeaf frame) {
+        this.fields = fields;
+        this.frame = frame;
+    }
+    
+    public void setFrame(FieldPrefixNSMLeaf frame) {
+        this.frame = frame;        
+    }
+    
+    public void setFields(IFieldAccessor[] fields) {
+        this.fields = fields;
+    }
+    
+    public void openRecSlotNum(int recSlotNum) {          
+        openRecSlotOff(frame.slotManager.getRecSlotOff(recSlotNum));         
+    }
+    
+    public void openRecSlotOff(int recSlotOff) {
+        this.recSlotOff = recSlotOff;
+        reset();
+    }
+    
+    // re-position to first field
+    public void reset() {
+        currentField = 0;
+        numPrefixFields = 0;
+        int recSlot = frame.getBuffer().getInt(recSlotOff);
+        int prefixSlotNum = frame.slotManager.decodeFirstSlotField(recSlot);
+        recOff = frame.slotManager.decodeSecondSlotField(recSlot);
+                
+        // position to prefix records first (if record is compressed)
+        if(prefixSlotNum != FieldPrefixSlotManager.RECORD_UNCOMPRESSED) {
+            int prefixSlotOff = frame.slotManager.getPrefixSlotOff(prefixSlotNum);
+            int prefixSlot = frame.getBuffer().getInt(prefixSlotOff);
+            numPrefixFields = frame.slotManager.decodeFirstSlotField(prefixSlot);
+            recOffRunner = frame.slotManager.decodeSecondSlotField(prefixSlot);               
+        }
+        else {
+            recOffRunner = recOff;
+        }
+    }
+    
+    public void nextField() {                   
+        // if we have passed the prefix fields of any of the two records, position them to the suffix record
+        if(currentField+1 == numPrefixFields) recOffRunner = recOff;
+        else recOffRunner += fields[currentField].getLength(frame.getBuffer().array(), recOffRunner);
+        currentField++;                        
+    }
+    
+    public int getFieldOff() {
+        return recOffRunner;
+    }        
+    
+    public int getFieldSize() {
+        return fields[currentField].getLength(frame.getBuffer().array(), recOffRunner);                
+    }
+    
+    // TODO: this is the simplest implementation, could be improved to minimize the number calls to System.arrayCopy()
+    // copies the fields [startField, endField] into dest at position destOff
+    // returns the total number of bytes written
+    // this operation does not change the instances state
+    public int copyFields(int startField, int endField, byte[] dest, int destOff) {
+        
+        // remember state
+        int oldCurrentField = currentField;
+        int oldRecOffRunner = recOffRunner;
+        
+        // do we need to reposition from start?
+        if(currentField != startField) {
+            reset();
+            while(currentField != startField) nextField();
+        }
+        
+        // perform copy
+        int bytesWritten = 0;
+        for(int i = startField; i <= endField; i++) {
+            int fieldSize = getFieldSize();                        
+            System.arraycopy(frame.getBuffer().array(), recOffRunner, dest, destOff, fieldSize);
+            bytesWritten += fieldSize;             
+            destOff += fieldSize;
+            nextField();
+        }
+                    
+        // restore original state
+        currentField = oldCurrentField;
+        recOffRunner = oldRecOffRunner;
+                    
+        return bytesWritten;
+    }
+    
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/FieldPrefixSlotManager.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/FieldPrefixSlotManager.java
new file mode 100644
index 0000000..920964e
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/FieldPrefixSlotManager.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.am.btree.frames.FieldPrefixNSMLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IComparator;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IPrefixSlotManager;
+
+public class FieldPrefixSlotManager implements IPrefixSlotManager {
+	
+	private static final int slotSize = 4;		
+	public static final int RECORD_UNCOMPRESSED = 0xFF;
+	public static final int MAX_PREFIX_SLOTS = 0xFE;
+	public static final int GREATEST_SLOT = 0x00FFFFFF;
+	
+	private ByteBuffer buf;
+	private FieldPrefixNSMLeaf frame;
+	FieldIterator fieldIter = new FieldIterator(null, null);
+	
+	public int decodeFirstSlotField(int slot) {
+		return (slot & 0xFF000000) >>> 24;
+	}
+	
+	public int decodeSecondSlotField(int slot) {
+		return slot & 0x00FFFFFF;
+	}
+	
+	public int encodeSlotFields(int firstField, int secondField) {
+		return ((firstField & 0x000000FF) << 24) | (secondField & 0x00FFFFFF);		
+	}
+	
+	// returns prefix slot number, or RECORD_UNCOMPRESSED of no match was found
+	public int findPrefix(byte[] data, MultiComparator multiCmp) {
+		int prefixMid;
+	    int prefixBegin = 0;
+	    int prefixEnd = frame.getNumPrefixRecords() - 1;
+	     
+	    if(frame.getNumPrefixRecords() > 0) {
+	    	while(prefixBegin <= prefixEnd) {
+	    		prefixMid = (prefixBegin + prefixEnd) / 2;	    			    		
+	    		int prefixSlotOff = getPrefixSlotOff(prefixMid);
+	    		int prefixSlot = buf.getInt(prefixSlotOff);    		
+	    		int numPrefixFields = decodeFirstSlotField(prefixSlot);
+	    		int prefixRecOff = decodeSecondSlotField(prefixSlot);
+	    		int cmp = multiCmp.fieldRangeCompare(data, 0, buf.array(), prefixRecOff, 0, numPrefixFields);
+	    		if(cmp < 0) prefixEnd = prefixMid - 1;				    		
+	    		else if(cmp > 0) prefixBegin = prefixMid + 1;	    		
+	    		else return prefixMid;
+	    	}
+	    }
+	    
+	    return FieldPrefixSlotManager.RECORD_UNCOMPRESSED;
+	}
+	
+	
+	public int findSlot(ByteBuffer buf, byte[] data, MultiComparator multiCmp, boolean exact) {				
+		if(frame.getNumRecords() <= 0) encodeSlotFields(RECORD_UNCOMPRESSED, GREATEST_SLOT);
+		
+	    int prefixMid;
+	    int prefixBegin = 0;
+	    int prefixEnd = frame.getNumPrefixRecords() - 1;
+	    int prefixMatch = RECORD_UNCOMPRESSED;
+	    
+	    // bounds are inclusive on both ends
+	    int recPrefixSlotNumLbound = prefixBegin;
+	    int recPrefixSlotNumUbound = prefixEnd;
+	    
+	    // binary search on the prefix slots to determine upper and lower bounds for the prefixSlotNums in record slots 
+	    if(frame.getNumPrefixRecords() > 0) {
+	    	while(prefixBegin <= prefixEnd) {
+	    		prefixMid = (prefixBegin + prefixEnd) / 2;	    			    		
+	    		int prefixSlotOff = getPrefixSlotOff(prefixMid);
+	    		int prefixSlot = buf.getInt(prefixSlotOff);    		
+	    		int numPrefixFields = decodeFirstSlotField(prefixSlot);
+	    		int prefixRecOff = decodeSecondSlotField(prefixSlot);
+	    		//System.out.println("PREFIX: " + prefixRecOff + " " + buf.getInt(prefixRecOff) + " " + buf.getInt(prefixRecOff+4));
+	    		int cmp = multiCmp.fieldRangeCompare(data, 0, buf.array(), prefixRecOff, 0, numPrefixFields);
+	    		if(cmp < 0) {	    			
+	    		    prefixEnd = prefixMid - 1;
+	    			recPrefixSlotNumLbound = prefixMid - 1;	    				    		
+	    		}
+	    		else if(cmp > 0) {	    			
+	    		    prefixBegin = prefixMid + 1;
+	    			recPrefixSlotNumUbound = prefixMid + 1;
+	    		}
+	    		else {
+	    		    recPrefixSlotNumLbound = prefixMid;
+	    			recPrefixSlotNumUbound = prefixMid;
+	    			prefixMatch = prefixMid;	    			
+	    			break;
+	    		}
+	    	}
+	    }
+	    
+	    //System.out.println("SLOTLBOUND: " + recPrefixSlotNumLbound);
+	    //System.out.println("SLOTUBOUND: " + recPrefixSlotNumUbound);
+	    
+	    int recMid = -1;
+	    int recBegin = 0;
+	    int recEnd = frame.getNumRecords() - 1;
+	    
+	    // binary search on records, guided by the lower and upper bounds on prefixSlotNum
+	    while(recBegin <= recEnd) {
+            recMid = (recBegin + recEnd) / 2;      
+            int recSlotOff = getRecSlotOff(recMid);
+            int recSlot = buf.getInt(recSlotOff);              
+            int prefixSlotNum = decodeFirstSlotField(recSlot);
+            int recOff = decodeSecondSlotField(recSlot);
+            
+            //System.out.println("RECS: " + recBegin + " " + recMid + " " + recEnd);
+            int cmp = 0;
+            if(prefixSlotNum == RECORD_UNCOMPRESSED) {                
+                cmp = multiCmp.compare(data, 0, buf.array(), recOff);                
+            }
+            else {             	
+            	if(prefixSlotNum < recPrefixSlotNumLbound) cmp = 1;
+            	else if(prefixSlotNum > recPrefixSlotNumUbound) cmp = -1;
+            	else cmp = compareCompressed(data, buf.array(), prefixSlotNum, recMid, multiCmp);            	            	
+            }    
+            
+            if(cmp < 0) recEnd = recMid - 1;
+            else if(cmp > 0) recBegin = recMid + 1;
+            else return encodeSlotFields(prefixMatch, recMid);
+        }
+	    
+	    //System.out.println("RECS: " + recBegin + " " + recMid + " " + recEnd);
+	    
+        if(exact) return encodeSlotFields(prefixMatch, GREATEST_SLOT);
+        if(recBegin > frame.getNumRecords() - 1) encodeSlotFields(prefixMatch, GREATEST_SLOT);
+        
+        // do final comparison to determine whether the search key is greater than all keys or in between some existing keys
+        int recSlotOff = getRecSlotOff(recBegin);
+        int recSlot = buf.getInt(recSlotOff);
+        int prefixSlotNum = decodeFirstSlotField(recSlot);
+        int recOff = decodeSecondSlotField(recSlot);
+        
+        int cmp = 0;
+        if(prefixSlotNum == RECORD_UNCOMPRESSED) cmp = multiCmp.compare(data, 0, buf.array(), recOff);        	
+        else cmp = compareCompressed(data, buf.array(), prefixSlotNum, recBegin, multiCmp);
+        
+        if(cmp < 0) return encodeSlotFields(prefixMatch, recBegin);
+        else return encodeSlotFields(prefixMatch, GREATEST_SLOT);
+	}
+	
+	public int compareCompressed(byte[] record, byte[] page, int prefixSlotNum, int recSlotNum, MultiComparator multiCmp) {                          
+         IComparator[] cmps = multiCmp.getComparators();
+         fieldIter.setFields(multiCmp.getFields());
+         fieldIter.setFrame(frame);         
+         fieldIter.openRecSlotNum(recSlotNum);
+         
+         int recRunner = 0;
+         int cmp = 0;
+         for(int i = 0; i < multiCmp.getKeyLength(); i++) {     
+             cmp = cmps[i].compare(record, recRunner, buf.array(), fieldIter.getFieldOff());             
+             if(cmp < 0) return -1;                 
+             else if(cmp > 0) return 1;             
+             fieldIter.nextField();
+             recRunner += multiCmp.getFields()[i].getLength(record, recRunner);
+         }
+         return 0;
+	}
+	
+	public int getPrefixSlotStartOff() {		
+	    return buf.capacity() - slotSize;	    
+	}
+	
+	public int getPrefixSlotEndOff() {
+	    return buf.capacity() - slotSize * frame.getNumPrefixRecords();
+	}
+	
+	public int getRecSlotStartOff() {
+	    return getPrefixSlotEndOff() - slotSize;
+	}
+	
+	public int getRecSlotEndOff() {		
+		return buf.capacity() - slotSize * (frame.getNumPrefixRecords() + frame.getNumRecords());	    
+	}	
+	
+	public int getSlotSize() {
+		return slotSize;
+	}
+	
+	public void setSlot(int offset, int value) {	
+		frame.getBuffer().putInt(offset, value);
+	}
+	
+	public int insertSlot(int slot, int recOff) {
+		int slotNum = decodeSecondSlotField(slot);
+		if(slotNum == GREATEST_SLOT) {			
+			int slotOff = getRecSlotEndOff() - slotSize;
+			int newSlot = encodeSlotFields(decodeFirstSlotField(slot), recOff);
+			setSlot(slotOff, newSlot);
+			//System.out.println("SETTING A: " + slotOff + " " + recOff);
+			return newSlot;
+		}
+		else {
+			int slotEndOff = getRecSlotEndOff();
+			int slotOff = getRecSlotOff(slotNum);
+			int length = (slotOff - slotEndOff) + slotSize;			
+			System.arraycopy(frame.getBuffer().array(), slotEndOff, frame.getBuffer().array(), slotEndOff - slotSize, length);						
+			int newSlot = encodeSlotFields(decodeFirstSlotField(slot), recOff);
+			setSlot(slotOff, newSlot);			
+			//System.out.println("SETTING B: " + slotOff + " " + recOff);
+			return newSlot;
+		}
+	}
+	
+	public void setFrame(FieldPrefixNSMLeaf frame) {
+		this.frame = frame;
+		this.buf = frame.getBuffer();
+	}
+	
+	public int getPrefixSlotOff(int slotNum) {
+		return getPrefixSlotStartOff() - slotNum * slotSize;
+	}
+	
+	public int getRecSlotOff(int slotNum) {
+		return getRecSlotStartOff() - slotNum * slotSize;
+	}
+	
+	public void setPrefixSlot(int slotNum, int slot) {
+		buf.putInt(getPrefixSlotOff(slotNum), slot);
+	}
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/MultiComparator.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/MultiComparator.java
new file mode 100644
index 0000000..d126b59
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/MultiComparator.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IComparator;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IFieldAccessor;
+
+public class MultiComparator {
+    
+	private IComparator[] cmps = null;
+	private IFieldAccessor[] fields = null;
+	
+	public MultiComparator(IComparator[] cmps, IFieldAccessor[] fields) {
+		this.cmps = cmps;
+		this.fields = fields;
+	}
+	
+	public IComparator[] getComparators() {
+	    return cmps;
+	}
+	
+	public int getKeyLength() {
+	    return cmps.length;
+	}
+	
+	public void setComparators(IComparator[] cmps) {
+		this.cmps = cmps;
+	}
+	
+	public int size() {
+		return cmps.length;
+	}
+	
+	public void setFields(IFieldAccessor[] fields) {
+		this.fields = fields;
+	}	
+	
+	public IFieldAccessor[] getFields() {
+		return fields;
+	}
+		
+	public int compare(byte[] dataA, int recOffA, byte[] dataB, int recOffB) {
+		for(int i = 0; i < cmps.length; i++) {
+    		int cmp = cmps[i].compare(dataA, recOffA, dataB, recOffB);	
+    		if(cmp < 0) return -1;
+    		else if(cmp > 0) return 1;
+    		recOffA += fields[i].getLength(dataA, recOffA);
+    		recOffB += fields[i].getLength(dataB, recOffB);
+    	}
+    	return 0;
+	}
+	
+	public int fieldRangeCompare(byte[] dataA, int recOffA, byte[] dataB, int recOffB, int startFieldIndex, int numFields) {
+		for(int i = startFieldIndex; i < startFieldIndex + numFields; i++) {
+		    int cmp = cmps[i].compare(dataA, recOffA, dataB, recOffB);
+    		if(cmp < 0) return -1;
+    		else if(cmp > 0) return 1;
+    		recOffA += fields[i].getLength(dataA, recOffA);
+    		recOffB += fields[i].getLength(dataB, recOffB);
+    	}
+    	return 0;
+	}
+	
+	public int getRecordSize(byte[] data, int recOff) {
+		int runner = recOff;
+		for(int i = 0; i < fields.length; i++) {
+			runner += fields[i].getLength(data, runner);
+		}
+		return runner - recOff;
+	}
+	
+	public int getKeySize(byte[] data, int recOff) {
+		int runner = recOff;
+		for(int i = 0; i < cmps.length; i++) {
+			runner += fields[i].getLength(data, runner);
+		}
+		return runner - recOff;
+	}		
+	
+	public String printRecord(byte[] data, int recOff) {
+		StringBuilder strBuilder = new StringBuilder();
+	    int runner = recOff;
+		for(int i = 0; i < fields.length; i++) {
+			strBuilder.append(fields[i].print(data, runner) + " ");
+			runner += fields[i].getLength(data, runner);
+		}
+		return strBuilder.toString();
+	}
+	
+	public String printKey(byte[] data, int recOff) {
+		StringBuilder strBuilder = new StringBuilder();		
+		int runner = recOff;
+		for(int i = 0; i < cmps.length; i++) {
+			strBuilder.append(fields[i].print(data, runner) + " ");			
+			runner += fields[i].getLength(data, runner);
+		}
+		return strBuilder.toString();
+	}		
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/NSMFrame.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/NSMFrame.java
new file mode 100644
index 0000000..97721db
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/NSMFrame.java
@@ -0,0 +1,222 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IFrame;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.ISlotManager;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.SpaceStatus;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public class NSMFrame implements IFrame {	
+	
+	protected static final int pageLsnOff = 0;								// 0
+	protected static final int numRecordsOff = pageLsnOff + 4;				// 4
+	protected static final int freeSpaceOff = numRecordsOff + 4;			// 8
+	protected static final int totalFreeSpaceOff = freeSpaceOff + 4;		// 16
+	
+	protected ICachedPage page = null;
+	protected ByteBuffer buf = null;
+	protected ISlotManager slotManager;
+	
+	public NSMFrame() {
+		// TODO: hardcode slotManager for now...		
+		this.slotManager = new OrderedSlotManager();
+	}
+	
+	@Override
+	public void setPage(ICachedPage page) {
+		this.page = page;
+		this.buf = page.getBuffer();
+		slotManager.setFrame(this);
+	}
+	
+	@Override
+	public ByteBuffer getBuffer() {
+		return page.getBuffer();
+	}
+	
+	@Override
+	public ICachedPage getPage() {
+		return page;
+	}
+	
+	@Override
+	public void compact(MultiComparator cmp) {		
+		resetSpaceParams();		
+		
+		int numRecords = buf.getInt(numRecordsOff);
+		int freeSpace = buf.getInt(freeSpaceOff);
+		byte[] data = buf.array();
+		
+		ArrayList<SlotOffRecOff> sortedRecOffs = new ArrayList<SlotOffRecOff>();
+		sortedRecOffs.ensureCapacity(numRecords);
+		for(int i = 0; i < numRecords; i++) {			
+			int slotOff = slotManager.getSlotOff(i);
+			int recOff = slotManager.getRecOff(slotOff);
+			sortedRecOffs.add(new SlotOffRecOff(slotOff, recOff));
+		}
+		Collections.sort(sortedRecOffs);
+		
+		for(int i = 0; i < sortedRecOffs.size(); i++) {
+			int recOff = sortedRecOffs.get(i).recOff;
+			int recSize = cmp.getRecordSize(data, recOff);
+			System.arraycopy(data, recOff, data, freeSpace, recSize);
+			slotManager.setSlot(sortedRecOffs.get(i).slotOff, freeSpace);
+			freeSpace += recSize;
+		}
+		
+		buf.putInt(freeSpaceOff, freeSpace);
+		buf.putInt(totalFreeSpaceOff, buf.capacity() - freeSpace - numRecords * slotManager.getSlotSize());				
+	}
+
+	@Override
+	public void delete(byte[] data, MultiComparator cmp, boolean exactDelete) throws Exception {		
+		int slotOff = slotManager.findSlot(buf, data, cmp, true);
+		if(slotOff < 0) {
+			throw new BTreeException("Key to be deleted does not exist.");
+		}
+		else {
+			if(exactDelete) {
+				// check the non-key columns for equality by byte-by-byte comparison
+				int storedRecOff = slotManager.getRecOff(slotOff);
+				int storedRecSize = cmp.getRecordSize(buf.array(), storedRecOff);						
+				if(data.length != storedRecSize) {
+					throw new BTreeException("Cannot delete record. Given record size does not match candidate record.");
+				}
+				for(int i = 0; i < storedRecSize; i++) {
+					if(data[i] != buf.array()[storedRecOff+i]) {
+						throw new BTreeException("Cannot delete record. Byte-by-byte comparison failed to prove equality.");
+					}				
+				}
+			}
+			
+			int recOff = slotManager.getRecOff(slotOff);
+			int recSize = cmp.getRecordSize(buf.array(), recOff);
+			
+			// perform deletion (we just do a memcpy to overwrite the slot)
+			int slotStartOff = slotManager.getSlotStartOff();
+			int length = slotOff - slotStartOff;
+			System.arraycopy(buf.array(), slotStartOff, buf.array(), slotStartOff + slotManager.getSlotSize(), length);
+						
+			// maintain space information
+			buf.putInt(numRecordsOff, buf.getInt(numRecordsOff) - 1);
+			buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + recSize + slotManager.getSlotSize());			
+		}
+	}
+	
+	@Override
+	public SpaceStatus hasSpaceInsert(byte[] data, MultiComparator cmp) {				
+		if(data.length + slotManager.getSlotSize() <= buf.capacity() - buf.getInt(freeSpaceOff) - (buf.getInt(numRecordsOff) * slotManager.getSlotSize()) ) return SpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE;
+		else if(data.length + slotManager.getSlotSize() <= buf.getInt(totalFreeSpaceOff)) return SpaceStatus.SUFFICIENT_SPACE;
+		else return SpaceStatus.INSUFFICIENT_SPACE;
+	}
+	
+	@Override
+	public SpaceStatus hasSpaceUpdate(int rid, byte[] data, MultiComparator cmp) {
+		// TODO Auto-generated method stub
+		return SpaceStatus.INSUFFICIENT_SPACE;
+	}
+
+	protected void resetSpaceParams() {
+		buf.putInt(freeSpaceOff, totalFreeSpaceOff + 4);
+		buf.putInt(totalFreeSpaceOff, buf.capacity() - (totalFreeSpaceOff + 4));
+	}
+	
+	@Override
+	public void initBuffer(byte level) {		
+		buf.putInt(pageLsnOff, 0); // TODO: might to set to a different lsn during creation
+		buf.putInt(numRecordsOff, 0);	
+		resetSpaceParams();
+	}
+	
+	@Override
+	public void insert(byte[] data, MultiComparator cmp) throws Exception {
+		int slotOff = slotManager.findSlot(buf, data, cmp, false);
+		slotOff = slotManager.insertSlot(slotOff, buf.getInt(freeSpaceOff));
+		
+		int recOff = buf.getInt(freeSpaceOff);
+		System.arraycopy(data, 0, buf.array(), recOff, data.length);		
+					
+		buf.putInt(numRecordsOff, buf.getInt(numRecordsOff) + 1);
+		buf.putInt(freeSpaceOff, buf.getInt(freeSpaceOff) + data.length);
+		buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) - data.length - slotManager.getSlotSize());				
+	}
+	
+	@Override
+	public void update(int rid, byte[] data) throws Exception {
+		// TODO Auto-generated method stub
+		
+	}	
+	
+	@Override
+	public void printHeader() {
+		// TODO Auto-generated method stub
+		
+	}		
+		
+	@Override
+	public int getNumRecords() {
+		return buf.getInt(numRecordsOff);
+	}
+
+	public ISlotManager getSlotManager() {
+		return slotManager;
+	}
+	
+	@Override
+	public String printKeys(MultiComparator cmp) {		
+		StringBuilder strBuilder = new StringBuilder();		
+		int numRecords = buf.getInt(numRecordsOff);
+		for(int i = 0; i < numRecords; i++) {			
+			int recOff = slotManager.getRecOff(slotManager.getSlotOff(i));
+			for(int j = 0; j < cmp.size(); j++) {				
+				strBuilder.append(cmp.getFields()[j].print(buf.array(), recOff) + " ");
+				recOff += cmp.getFields()[j].getLength(buf.array(), recOff);
+			}
+			strBuilder.append(" | ");				
+		}
+		strBuilder.append("\n");
+		return strBuilder.toString();
+	}
+	
+	@Override
+	public int getRecordOffset(int slotNum) {
+		return slotManager.getRecOff(slotManager.getSlotEndOff() - slotNum * slotManager.getSlotSize());
+	}
+
+	@Override
+	public int getPageLsn() {
+		return buf.getInt(pageLsnOff);		
+	}
+
+	@Override
+	public void setPageLsn(int pageLsn) {
+		buf.putInt(pageLsnOff, pageLsn);		
+	}
+
+	@Override
+	public int getTotalFreeSpace() {
+		return buf.getInt(totalFreeSpaceOff);
+	}
+
+    @Override
+    public void compress(MultiComparator cmp) {
+        
+    }
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/NodeFrontier.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/NodeFrontier.java
new file mode 100644
index 0000000..cd27d97
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/NodeFrontier.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public class NodeFrontier {
+	public int bytesInserted;
+	public ICachedPage page;
+	public int pageId;
+	public byte[] lastRecord;
+}
+
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/OrderedSlotManager.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/OrderedSlotManager.java
new file mode 100644
index 0000000..9cb2c24
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/OrderedSlotManager.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.IFrame;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.ISlotManager;
+
+public class OrderedSlotManager implements ISlotManager {
+	
+	private static final int slotSize = 4;
+	private IFrame frame;
+	
+	// TODO: mix in interpolation search
+	@Override
+	public int findSlot(ByteBuffer buf, byte[] data, MultiComparator multiCmp, boolean exact) {
+		int mid;
+		int begin = getSlotStartOff();
+		int end = getSlotEndOff();
+				
+        // deal with first record insertion
+		if(begin == buf.capacity()) return -1;		
+		
+        while(begin <= end) {
+            mid = (begin + end) >> 1;
+        	mid -= mid % slotSize;        	
+        	int recOff = getRecOff(mid);
+        	int cmp = multiCmp.compare(data, 0, buf.array(), recOff);
+        	if(cmp < 0)
+        		begin = mid + slotSize;
+        	else if(cmp > 0)
+        		end = mid - slotSize;
+        	else
+        		return mid;
+        }
+                        
+        if(exact) return -1;        
+        if(end < getSlotStartOff()) return -1;        
+        if(multiCmp.compare(data, 0, buf.array(), getRecOff(end))  < 0)
+        	return end;
+        else
+        	return -1;		
+	}
+	
+	@Override
+	public int getRecOff(int offset) {		
+		return frame.getBuffer().getInt(offset);
+	}
+	
+	@Override
+	public void setSlot(int offset, int value) {	
+		frame.getBuffer().putInt(offset, value);
+	}
+	
+	@Override
+	public int getSlotStartOff() {
+		return frame.getBuffer().capacity() - (frame.getNumRecords() * slotSize);
+	}
+	
+	@Override
+	public int getSlotEndOff() {
+		return frame.getBuffer().capacity() - slotSize;
+	}
+
+	@Override
+	public int getSlotSize() {
+		return slotSize;
+	}
+	
+	@Override
+	public int insertSlot(int slotOff, int recOff) {
+		if(slotOff < 0) {
+			slotOff = getSlotStartOff() - slotSize;
+			setSlot(slotOff, recOff);
+			return slotOff;
+		}
+		else {
+			int slotStartOff = getSlotStartOff();
+			int length = (slotOff - slotStartOff) + slotSize;			
+			System.arraycopy(frame.getBuffer().array(), slotStartOff, frame.getBuffer().array(), slotStartOff - slotSize, length);			
+			setSlot(slotOff, recOff);
+			return slotOff;
+		}		
+	}
+	
+	@Override
+	public void setFrame(IFrame frame) {
+		this.frame = frame;		
+	}
+
+	@Override
+	public int getSlotOff(int slotNum) {
+		return getSlotEndOff() - slotNum * slotSize;
+	}	
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/OrderedSlotManagerFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/OrderedSlotManagerFactory.java
new file mode 100644
index 0000000..2daa514
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/OrderedSlotManagerFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.ISlotManager;
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.ISlotManagerFactory;
+
+public class OrderedSlotManagerFactory implements ISlotManagerFactory {
+	
+	@Override
+	public ISlotManager getSlotManager() {
+		return new OrderedSlotManager();
+	}	
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangePredicate.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangePredicate.java
new file mode 100644
index 0000000..8f85a1f
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangePredicate.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.interfaces.ISearchPredicate;
+
+public class RangePredicate implements ISearchPredicate {
+	
+	protected boolean isForward = true;
+	protected byte[] lowKeys = null;
+	protected byte[] highKeys = null;
+	protected MultiComparator cmp;
+	
+	public RangePredicate() {
+	}
+	
+	// TODO: for now range is [lowKey, highKey] but depending on user predicate the range could be exclusive on any end
+	// need to model this somehow	
+	// for point queries just use same value for low and high key
+	public RangePredicate(boolean isForward, byte[] lowKeys, byte[] highKeys, MultiComparator cmp) {
+		this.isForward = isForward;
+		this.lowKeys = lowKeys;
+		this.highKeys = highKeys;
+		this.cmp = cmp;
+	}
+	
+	public MultiComparator getComparator() {
+		return cmp;
+	}
+	
+	public void setComparator(MultiComparator cmp) {
+		this.cmp = cmp;
+	}
+	
+	public boolean isForward() {
+		return isForward;
+	}	
+	
+	public byte[] getLowKeys() {
+		return lowKeys;
+	}
+	
+	public byte[] getHighKeys() {
+		return highKeys;
+	}
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/SlotOffRecOff.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/SlotOffRecOff.java
new file mode 100644
index 0000000..c198527
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/SlotOffRecOff.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+public class SlotOffRecOff implements Comparable<SlotOffRecOff> {
+	public int slotOff;
+	public int recOff;
+	
+	public SlotOffRecOff(int slotOff, int recOff) {
+		this.slotOff = slotOff;
+		this.recOff = recOff;
+	}
+	
+	@Override
+	public int compareTo(SlotOffRecOff o) {			
+		return recOff - o.recOff;
+	}
+}
+
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/SplitKey.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/SplitKey.java
new file mode 100644
index 0000000..0088277
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/SplitKey.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.impls;
+
+import java.nio.ByteBuffer;
+
+public class SplitKey {		
+	private byte[] data = null;	
+	private ByteBuffer buf = null;
+	
+	public void initData(int keySize) {
+		// try to reuse existing memory from a lower-level split if possible
+		// in general this is always possible for fixed-sized keys		
+		/*
+		if(data != null) {
+			if(data.length < keySize + 8) {
+				data = new byte[keySize + 8]; // add 8 for left and right page
+				buf = ByteBuffer.wrap(data);
+			}				
+		}
+		else { 
+			data = new byte[keySize + 8]; // add 8 for left and right page
+			buf = ByteBuffer.wrap(data);
+		}
+		*/		
+		data = new byte[keySize + 8]; // add 8 for left and right page
+		buf = ByteBuffer.wrap(data);
+	}
+	
+	public void reset() {
+		data = null;
+		buf = null;
+	}
+	
+	public void setData(byte[] data) {
+		this.data = data;
+	}
+	
+	public byte[] getData() {
+		return data;
+	}
+	
+	public int getLeftPage() {
+		return buf.getInt(data.length - 8);
+	}
+	
+	public int getRightPage() {
+		return buf.getInt(data.length - 4);
+	}
+		
+	public void setLeftPage(int leftPage) {
+		buf.putInt(data.length - 8, leftPage);
+	}
+	
+	public void setRightPage(int rightPage) {
+		buf.putInt(data.length - 4, rightPage);
+	}
+	
+	public void setPages(int leftPage, int rightPage) {
+		buf.putInt(data.length - 8, leftPage);
+		buf.putInt(data.length - 4, rightPage);
+	}
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeCursor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeCursor.java
new file mode 100644
index 0000000..e05dbb7
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeCursor.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public interface IBTreeCursor {
+	public void reset();
+	public boolean hasNext() throws Exception;
+	public int getOffset();
+	public void next() throws Exception;	
+	public void open(ICachedPage page, ISearchPredicate searchPred) throws Exception;
+	public ICachedPage getPage();
+	public void close() throws Exception;
+	public void setBufferCache(IBufferCache bufferCache);
+	public void setFileId(int fileId);
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrame.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrame.java
new file mode 100644
index 0000000..8afe219
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrame.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.SplitKey;
+
+public interface IBTreeFrame extends IFrame {
+	// TODO; what if records more than half-page size?
+	public int split(IBTreeFrame rightFrame, byte[] data, MultiComparator cmp, SplitKey splitKey) throws Exception;		
+	
+	// TODO: check if we do something nicer than returning object
+	public ISlotManager getSlotManager();
+	
+	// ATTENTION: in b-tree operations it may not always be possible to determine whether an ICachedPage is a leaf or interior node
+	// a compatible interior and leaf implementation MUST return identical values when given the same ByteBuffer for the functions below	
+	public boolean isLeaf();
+	public byte getLevel();
+	public void setLevel(byte level);	
+	public boolean getSmFlag(); // structure modification flag
+	public void setSmFlag(boolean smFlag);	
+	
+	//public int getNumPrefixRecords();
+	//public void setNumPrefixRecords(int numPrefixRecords);
+	
+	public void insertSorted(byte[] data, MultiComparator cmp) throws Exception;
+	
+	// for debugging
+	public int getFreeSpaceOff();
+	public void setFreeSpaceOff(int freeSpace);
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameFactory.java
new file mode 100644
index 0000000..e0fd99f
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameFactory.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+public interface IBTreeFrameFactory {	
+	public IBTreeFrame getFrame(ISlotManager slotManager);
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameInterior.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameInterior.java
new file mode 100644
index 0000000..677a2a9
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameInterior.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+
+public interface IBTreeFrameInterior extends IBTreeFrame {
+	//public int getChildPageId(IFieldAccessor[] fields, MultiComparator cmp);
+	public int getChildPageId(RangePredicate pred, MultiComparator srcCmp);
+	public int getLeftmostChildPageId(MultiComparator cmp);
+	public int getRightmostChildPageId(MultiComparator cmp);
+	public void setRightmostChildPageId(int pageId);
+	public void deleteGreatest(MultiComparator cmp);
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameInteriorFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameInteriorFactory.java
new file mode 100644
index 0000000..4296607
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameInteriorFactory.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+public interface IBTreeFrameInteriorFactory {	
+	public IBTreeFrameInterior getFrame();
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameLeaf.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameLeaf.java
new file mode 100644
index 0000000..f66ce73
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameLeaf.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+public interface IBTreeFrameLeaf extends IBTreeFrame {	
+	public void setNextLeaf(int nextPage);
+	public int getNextLeaf();
+	
+	public void setPrevLeaf(int prevPage);
+	public int getPrevLeaf();
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameLeafFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameLeafFactory.java
new file mode 100644
index 0000000..fb15046
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameLeafFactory.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+public interface IBTreeFrameLeafFactory {
+    public IBTreeFrameLeaf getFrame();
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameMeta.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameMeta.java
new file mode 100644
index 0000000..f430d38
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameMeta.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public interface IBTreeFrameMeta {
+    public void initBuffer(int level);
+    
+    public void setPage(ICachedPage page);
+    public ICachedPage getPage();
+    
+    public byte getLevel();
+    public void setLevel(byte level);
+    
+    public int getNextPage();
+    public void setNextPage(int nextPage);
+    
+    public int getMaxPage();
+    public void setMaxPage(int maxPage);
+    
+    public int getFreePage();
+    public boolean hasSpace();
+    public void addFreePage(int freePage);
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameMetaFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameMetaFactory.java
new file mode 100644
index 0000000..27b61cd
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeFrameMetaFactory.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+public interface IBTreeFrameMetaFactory {   
+    public IBTreeFrameMeta getFrame();
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeSlotManager.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeSlotManager.java
new file mode 100644
index 0000000..443ffc4
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IBTreeSlotManager.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+// a slot consists of two fields:
+// first field is 1 byte, it indicates the slot number of a prefix record
+// we call the first field prefixSlotOff
+// second field is 3 bytes, it points to the start offset of a record
+// we call the second field recOff
+
+// we distinguish between two slot types:
+// prefix slots that point to prefix records, 
+// a frame is assumed to have a field numPrefixRecords
+// record slots that point to data records
+// a frame is assumed to have a field numRecords
+// a record slot contains a record pointer and a pointer to a prefix slot (prefix slot number) 
+
+// INSERT procedure
+// a record insertion may use an existing prefix record 
+// a record insertion may never create a new prefix record
+// modifying the prefix slots would be extremely expensive because: 
+// potentially all records slots would have to change their prefix slot pointers
+// all prefixes are recomputed during a reorg or compaction
+
+public interface IBTreeSlotManager {
+	public void setFrame(IBTreeFrame frame);		
+	
+	public int decodeFirstSlotField(int slot);
+	public int decodeSecondSlotField(int slot);		
+	public int encodeSlotFields(int firstField, int secondField);
+	
+	// TODO: first argument can be removed. frame must be set and buffer can be gotten from there
+	public int findSlot(ByteBuffer buf, byte[] data, MultiComparator multiCmp, boolean exact);
+	public int insertSlot(int slotOff, int recOff);
+					
+	public int getRecSlotStartOff();
+	public int getRecSlotEndOff();
+	
+	public int getPrefixSlotStartOff();
+	public int getPrefixSlotEndOff();
+	
+	public int getRecSlotOff(int slotNum);
+	public int getPrefixSlotOff(int slotNum);	
+		
+	public int getSlotSize();		
+	
+	// functions for testing
+	public void setPrefixSlot(int slotNum, int slot);
+	
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IComparator.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IComparator.java
new file mode 100644
index 0000000..b26f799
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IComparator.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+public interface IComparator {		
+	public int compare(byte[] dataA, int recOffA, byte[] dataB, int recOffB);
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFieldAccessor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFieldAccessor.java
new file mode 100644
index 0000000..65d4802
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFieldAccessor.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+public interface IFieldAccessor {	    
+    public int getLength(byte[] data, int offset); // skip to next field (equivalent to adding length of field to offset)        
+	public String print(byte[] data, int offset); // debug
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFieldAccessorFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFieldAccessorFactory.java
new file mode 100644
index 0000000..a5a4603
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFieldAccessorFactory.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+public interface IFieldAccessorFactory {
+	public IFieldAccessor getKey();
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFrame.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFrame.java
new file mode 100644
index 0000000..90142e4
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFrame.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public interface IFrame {	
+	public void setPage(ICachedPage page);
+	public ICachedPage getPage();
+	public ByteBuffer getBuffer();
+	
+	public void insert(byte[] data, MultiComparator cmp) throws Exception;
+	public void update(int rid, byte[] data) throws Exception;
+	public void delete(byte[] data, MultiComparator cmp, boolean exactDelete) throws Exception;
+	
+	public void compact(MultiComparator cmp);
+	public void compress(MultiComparator cmp) throws Exception;
+	
+	public void initBuffer(byte level);
+	
+	public int getNumRecords();
+		
+	// assumption: page must be write-latched at this point
+	public SpaceStatus hasSpaceInsert(byte[] data, MultiComparator cmp);
+	public SpaceStatus hasSpaceUpdate(int rid, byte[] data, MultiComparator cmp);
+	
+	public int getRecordOffset(int slotNum);
+	
+	public int getTotalFreeSpace();	
+	
+	public void setPageLsn(int pageLsn);
+	public int getPageLsn();
+	
+	// for debugging
+	public void printHeader();
+	public String printKeys(MultiComparator cmp);
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFrameCompressor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFrameCompressor.java
new file mode 100644
index 0000000..88f3fae
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IFrameCompressor.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+import edu.uci.ics.hyracks.storage.am.btree.frames.FieldPrefixNSMLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+public interface IFrameCompressor {
+	public void compress(FieldPrefixNSMLeaf frame, MultiComparator cmp) throws Exception;
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IPrefixSlotManager.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IPrefixSlotManager.java
new file mode 100644
index 0000000..7996f1a
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/IPrefixSlotManager.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.am.btree.frames.FieldPrefixNSMLeaf;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+// a slot consists of two fields:
+// first field is 1 byte, it indicates the slot number of a prefix record
+// we call the first field prefixSlotOff
+// second field is 3 bytes, it points to the start offset of a record
+// we call the second field recOff
+
+// we distinguish between two slot types:
+// prefix slots that point to prefix records, 
+// a frame is assumed to have a field numPrefixRecords
+// record slots that point to data records
+// a frame is assumed to have a field numRecords
+// a record slot contains a record pointer and a pointer to a prefix slot (prefix slot number) 
+
+// INSERT procedure
+// a record insertion may use an existing prefix record 
+// a record insertion may never create a new prefix record
+// modifying the prefix slots would be extremely expensive because: 
+// potentially all records slots would have to change their prefix slot pointers
+// all prefixes are recomputed during a reorg or compaction
+
+public interface IPrefixSlotManager {
+	public void setFrame(FieldPrefixNSMLeaf frame);
+	
+	public int decodeFirstSlotField(int slot);
+	public int decodeSecondSlotField(int slot);		
+	public int encodeSlotFields(int firstField, int secondField);
+	
+	// TODO: first argument can be removed. frame must be set and buffer can be gotten from there
+	public int findSlot(ByteBuffer buf, byte[] data, MultiComparator multiCmp, boolean exact);	
+	public int insertSlot(int slot, int recOff);
+					
+	// returns prefix slot number, returns RECORD_UNCOMPRESSED if none found
+	public int findPrefix(byte[] data, MultiComparator multiCmp);
+	
+	public int getRecSlotStartOff();
+	public int getRecSlotEndOff();
+	
+	public int getPrefixSlotStartOff();
+	public int getPrefixSlotEndOff();
+	
+	public int getRecSlotOff(int slotNum);
+	public int getPrefixSlotOff(int slotNum);	
+		
+	public int getSlotSize();		
+	
+	public void setSlot(int offset, int value);
+	
+	public int compareCompressed(byte[] record, byte[] page, int prefixSlotNum, int recSlotNum, MultiComparator multiCmp);
+	
+	// functions for testing
+	public void setPrefixSlot(int slotNum, int slot);
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/ISearchPredicate.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/ISearchPredicate.java
new file mode 100644
index 0000000..b04bca1
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/ISearchPredicate.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+public interface ISearchPredicate {
+	public boolean isForward();	
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/ISlotManager.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/ISlotManager.java
new file mode 100644
index 0000000..45aa5e1
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/ISlotManager.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+
+public interface ISlotManager {
+	public void setFrame(IFrame frame);
+	
+	// TODO: first argument can be removed. frame must be set and buffer can be gotten from there
+	public int findSlot(ByteBuffer buf, byte[] data, MultiComparator multiCmp, boolean exact);
+	public int insertSlot(int slotOff, int recOff);
+	
+	public int getSlotStartOff();
+	public int getSlotEndOff();
+	
+	public int getRecOff(int slotOff);		
+	public void setSlot(int slotOff, int value);	
+	
+	public int getSlotOff(int slotNum);
+	
+	public int getSlotSize();
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/ISlotManagerFactory.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/ISlotManagerFactory.java
new file mode 100644
index 0000000..fc8b8b4
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/ISlotManagerFactory.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+public interface ISlotManagerFactory {
+	public ISlotManager getSlotManager();
+}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/SpaceStatus.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/SpaceStatus.java
new file mode 100644
index 0000000..15496dd
--- /dev/null
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/interfaces/SpaceStatus.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.btree.interfaces;
+
+public enum SpaceStatus {
+	INSUFFICIENT_SPACE,
+	SUFFICIENT_CONTIGUOUS_SPACE,
+	SUFFICIENT_SPACE
+}
\ No newline at end of file