added btree search and btree bulkload operators and made some interfaces serializable

git-svn-id: https://hyracks.googlecode.com/svn/trunk@100 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks/hyracks-storage-am-btree/pom.xml
index 06cac47..5d91b5a 100644
--- a/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks/hyracks-storage-am-btree/pom.xml
@@ -39,6 +39,13 @@
   		<scope>compile</scope>
   	</dependency>  	
   	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-dataflow-std</artifactId>
+  		<version>0.1.2-SNAPSHOT</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>  	
+  	<dependency>
   		<groupId>junit</groupId>
   		<artifactId>junit</artifactId>
   		<version>4.8.1</version>
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeInteriorFrameFactory.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeInteriorFrameFactory.java
index a9d7805..8383fed 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeInteriorFrameFactory.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeInteriorFrameFactory.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.btree.api;
 
-public interface IBTreeInteriorFrameFactory {	
+import java.io.Serializable;
+
+public interface IBTreeInteriorFrameFactory extends Serializable {	
 	public IBTreeInteriorFrame getFrame();
 }
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeLeafFrameFactory.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeLeafFrameFactory.java
index 591f81d..0855a72 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeLeafFrameFactory.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IBTreeLeafFrameFactory.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.btree.api;
 
-public interface IBTreeLeafFrameFactory {	
+import java.io.Serializable;
+
+public interface IBTreeLeafFrameFactory extends Serializable {	
 	public IBTreeLeafFrame getFrame();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IFieldAccessor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IFieldAccessor.java
index a076f0f..55a25ea 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IFieldAccessor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/IFieldAccessor.java
@@ -15,7 +15,9 @@
 
 package edu.uci.ics.hyracks.storage.am.btree.api;
 
-public interface IFieldAccessor {	    
+import java.io.Serializable;
+
+public interface IFieldAccessor extends Serializable {	    
     public int getLength(byte[] data, int offset); // skip to next field (equivalent to adding length of field to offset)        
 	public String print(byte[] data, int offset); // debug
 }
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/ISearchPredicate.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/ISearchPredicate.java
index e6c7509..334f2f3 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/ISearchPredicate.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/api/ISearchPredicate.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.btree.api;
 
-public interface ISearchPredicate {
+import java.io.Serializable;
+
+public interface ISearchPredicate extends Serializable {
 	public boolean isForward();	
 }
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorDescriptor.java
new file mode 100644
index 0000000..72a9f12
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorDescriptor.java
@@ -0,0 +1,76 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+
+public abstract class AbstractBTreeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+	
+	private static final long serialVersionUID = 1L;
+	
+	protected String btreeFileName;
+	protected int btreeFileId;
+	
+	protected MultiComparator cmp;	
+	protected RangePredicate rangePred;
+	
+	protected IBTreeInteriorFrameFactory interiorFrameFactory;
+	protected IBTreeLeafFrameFactory leafFrameFactory;
+	
+	protected IBufferCacheProvider bufferCacheProvider;
+	protected IBTreeRegistryProvider btreeRegistryProvider;
+	
+	public AbstractBTreeOperatorDescriptor(JobSpecification spec, int inputArity, int outputArity, IFileSplitProvider fileSplitProvider, RecordDescriptor recDesc, IBufferCacheProvider bufferCacheProvider, IBTreeRegistryProvider btreeRegistryProvider,  int btreeFileId, String btreeFileName, IBTreeInteriorFrameFactory interiorFactory, IBTreeLeafFrameFactory leafFactory, MultiComparator cmp, RangePredicate rangePred) {
+        super(spec, inputArity, outputArity);
+        this.cmp = cmp;
+        this.rangePred = rangePred;
+        this.btreeFileId = btreeFileId;
+        this.btreeFileName = btreeFileName;
+        this.bufferCacheProvider = bufferCacheProvider;
+        this.btreeRegistryProvider = btreeRegistryProvider;        
+        this.interiorFrameFactory = interiorFactory;
+        this.leafFrameFactory = leafFactory;
+        if(outputArity > 0) recordDescriptors[0] = recDesc;   
+    }
+
+	public String getBtreeFileName() {
+		return btreeFileName;
+	}
+
+	public int getBtreeFileId() {
+		return btreeFileId;
+	}
+
+	public MultiComparator getMultiComparator() {
+		return cmp;
+	}
+
+	public RangePredicate getRangePredicate() {
+		return rangePred;
+	}
+
+	public IBTreeInteriorFrameFactory getInteriorFactory() {
+		return interiorFrameFactory;
+	}
+
+	public IBTreeLeafFrameFactory getLeafFactory() {
+		return leafFrameFactory;
+	}
+
+	public IBufferCacheProvider getBufferCacheProvider() {
+		return bufferCacheProvider;
+	}
+
+	public IBTreeRegistryProvider getBtreeRegistryProvider() {
+		return btreeRegistryProvider;
+	}
+	
+	public RecordDescriptor getRecordDescriptor() {
+		return recordDescriptors[0];    
+	}
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java
new file mode 100644
index 0000000..ef0ab4d
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/AbstractBTreeOperatorNodePushable.java
@@ -0,0 +1,152 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.util.Random;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.FileInfo;
+import edu.uci.ics.hyracks.storage.common.file.FileManager;
+
+public abstract class AbstractBTreeOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable {
+		
+	protected IBTreeInteriorFrame interiorFrame;
+	protected IBTreeLeafFrame leafFrame;
+	
+	protected BTree btree;
+	
+	protected AbstractBTreeOperatorDescriptor opDesc;	
+	protected IHyracksContext ctx;
+	
+	public AbstractBTreeOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, final IHyracksContext ctx) {
+		this.opDesc = opDesc;
+		this.ctx = ctx;
+	}
+	
+	public void init() throws FileNotFoundException {
+		IBufferCache bufferCache = opDesc.getBufferCacheProvider().getBufferCache();
+		FileManager fileManager = opDesc.getBufferCacheProvider().getFileManager();
+		
+        File f = new File(opDesc.getBtreeFileName());
+        RandomAccessFile raf = new RandomAccessFile(f, "rw");
+        
+        try {
+        	FileInfo fi = new FileInfo(opDesc.getBtreeFileId(), raf);
+        	fileManager.registerFile(fi);
+        }
+        catch (Exception e) {
+        }
+        
+		BTreeRegistry btreeRegistry = opDesc.getBtreeRegistryProvider().getBTreeRegistry();
+		btree = btreeRegistry.get(opDesc.getBtreeFileId());
+        if(btree == null) {
+        	
+        	// create new btree and register it            
+            btreeRegistry.lock();
+            try {
+                // check if btree has already been registered by another thread
+                btree = btreeRegistry.get(opDesc.getBtreeFileId());
+                if(btree == null) {
+                                    	
+                	// this thread should create and register the btee                                        
+                    btree = new BTree(bufferCache, 
+                    		opDesc.getInteriorFactory(), 
+                    		opDesc.getLeafFactory(), 
+                    		opDesc.getMultiComparator());
+                    btree.open(opDesc.getBtreeFileId());
+                    btreeRegistry.register(opDesc.getBtreeFileId(), btree);
+                }                
+            }
+            finally {                        
+                btreeRegistry.unlock();
+            }
+        }            
+                
+        interiorFrame = opDesc.getInteriorFactory().getFrame();
+        leafFrame = opDesc.getLeafFactory().getFrame();    	
+	}	
+	
+	// debug
+	protected void fill() throws Exception {
+		MetaDataFrame metaFrame = new MetaDataFrame();
+		
+		btree.create(opDesc.getBtreeFileId(), leafFrame, metaFrame);
+		
+		Random rnd = new Random();
+		rnd.setSeed(50);				
+		
+		for (int i = 0; i < 10000; i++) {
+			ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
+			DataOutputStream dos = new DataOutputStream(baaos);        	        	
+
+			int f0 = rnd.nextInt() % 10000;
+			int f1 = 5;
+
+			IntegerSerializerDeserializer.INSTANCE.serialize(f0, dos);
+			IntegerSerializerDeserializer.INSTANCE.serialize(f1, dos);
+
+			byte[] record = baaos.toByteArray();
+
+			if (i % 1000 == 0) {
+				System.out.println("INSERTING " + i + " : " + f0 + " " + f1);            	
+			}
+
+			try {                                
+				btree.insert(record, leafFrame, interiorFrame, metaFrame);
+			} catch (Exception e) {
+			}
+		}
+
+		/*
+        IFieldAccessor[] fields = new IFieldAccessor[2];
+        fields[0] = new Int32Accessor(); // key field
+        fields[1] = new Int32Accessor(); // value field
+
+        int keyLen = 1;
+        IBinaryComparator[] cmps = new IBinaryComparator[keyLen];
+        cmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        MultiComparator cmp = new MultiComparator(cmps, fields);		
+
+        ByteArrayAccessibleOutputStream lkbaaos = new ByteArrayAccessibleOutputStream();
+        DataOutputStream lkdos = new DataOutputStream(lkbaaos);    	    	    	
+        IntegerSerializerDeserializer.INSTANCE.serialize(-1000, lkdos);
+
+        ByteArrayAccessibleOutputStream hkbaaos = new ByteArrayAccessibleOutputStream();
+        DataOutputStream hkdos = new DataOutputStream(hkbaaos);    	    	    	
+        IntegerSerializerDeserializer.INSTANCE.serialize(1000, hkdos);
+
+        byte[] lowKey = lkbaaos.toByteArray();
+        byte[] highKey = hkbaaos.toByteArray();
+
+        IBinaryComparator[] searchCmps = new IBinaryComparator[1];
+        searchCmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        MultiComparator searchCmp = new MultiComparator(searchCmps, fields);
+
+        RangePredicate rangePred = new RangePredicate(true, lowKey, highKey, searchCmp);
+        btree.search(cursor, rangePred, leafFrame, interiorFrame);
+        try {
+            while (cursor.hasNext()) {
+            	cursor.next();
+                byte[] array = cursor.getPage().getBuffer().array();
+                int recOffset = cursor.getOffset();                
+                String rec = cmp.printRecord(array, recOffset);
+                System.out.println(rec);         
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            cursor.close();
+        }	
+        */	 
+	}
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorDescriptor.java
new file mode 100644
index 0000000..c559cf6
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorDescriptor.java
@@ -0,0 +1,46 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+public class BTreeBulkLoadOperatorDescriptor extends AbstractBTreeOperatorDescriptor {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private final int[] keyFields;
+	private final int[] payloadFields;
+	
+    private float fillFactor;
+	
+	public BTreeBulkLoadOperatorDescriptor(JobSpecification spec,
+			IFileSplitProvider fileSplitProvider, RecordDescriptor recDesc,
+			IBufferCacheProvider bufferCacheProvider,
+			IBTreeRegistryProvider btreeRegistryProvider, int btreeFileId,
+			String btreeFileName, IBTreeInteriorFrameFactory interiorFactory,
+			IBTreeLeafFrameFactory leafFactory, MultiComparator cmp, 
+			int[] keyFields, int[] payloadFields, float fillFactor) {
+		super(spec, 1, 0, fileSplitProvider, recDesc, bufferCacheProvider,
+				btreeRegistryProvider, btreeFileId, btreeFileName, interiorFactory,
+				leafFactory, cmp, null);
+		this.keyFields = keyFields;
+		this.payloadFields = payloadFields;
+		this.fillFactor = fillFactor;
+	}
+	
+	@Override
+	public IOperatorNodePushable createPushRuntime(IHyracksContext ctx,
+			IOperatorEnvironment env,
+			IRecordDescriptorProvider recordDescProvider, int partition,
+			int nPartitions) {
+		return new BTreeBulkLoadOperatorNodePushable(this, ctx, keyFields, payloadFields, fillFactor, recordDescProvider);
+	}
+	
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
new file mode 100644
index 0000000..58b3265
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
@@ -0,0 +1,105 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+public class BTreeBulkLoadOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
+	
+	private final int[] keyFields;
+	private final int[] payloadFields;
+	
+    private float fillFactor;
+    
+    private FrameTupleAccessor accessor;
+    private BTree.BulkLoadContext bulkLoadCtx;
+    
+    private IRecordDescriptorProvider recordDescProvider;
+    
+	public BTreeBulkLoadOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx, int[] keyFields, int[] payloadFields, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
+		super(opDesc, ctx);
+		this.keyFields = keyFields;
+		this.payloadFields = payloadFields;
+		this.fillFactor = fillFactor;
+		this.recordDescProvider = recordDescProvider;
+	}
+	
+	@Override
+	public void close() throws HyracksDataException {
+		try {
+			btree.endBulkLoad(bulkLoadCtx);
+		} catch (Exception e) {
+			e.printStackTrace();
+		}			
+	}
+	
+	@Override
+	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {		
+		accessor.reset(buffer);
+				
+        // build record for insertion into btree               
+		int tupleCount = accessor.getTupleCount();
+		for(int i = 0; i < tupleCount; i++) {						
+			// determine size of record
+			int btreeRecordSize = 0;			
+			for(int j = 0; j < keyFields.length; j++) {
+				btreeRecordSize += accessor.getFieldLength(i, keyFields[j]);
+			}
+			for(int j = 0; j < payloadFields.length; j++) {
+				btreeRecordSize += accessor.getFieldLength(i, payloadFields[j]);
+			}			
+			
+			MultiComparator cmp = opDesc.getMultiComparator();
+			
+			// allocate record and copy fields
+			byte[] btreeRecord = new byte[btreeRecordSize];
+			int recRunner = 0;
+			for(int j = 0; j < keyFields.length; j++) {
+				int fieldStartOff = accessor.getTupleStartOffset(i) + + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(i, keyFields[j]);				
+				int fieldLength = accessor.getFieldLength(i, keyFields[j]);								
+				
+				String rec = cmp.printKey(buffer.array(), fieldStartOff);
+				System.out.println("REC: " + rec);
+				
+				System.arraycopy(buffer.array(), fieldStartOff, btreeRecord, recRunner, fieldLength);				
+				recRunner += fieldLength;
+			}
+			for(int j = 0; j < payloadFields.length; j++) {
+				int fieldStartOff = accessor.getTupleStartOffset(i) + + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(i, payloadFields[j]);
+				int fieldLength = accessor.getFieldLength(i, payloadFields[j]);
+				System.arraycopy(buffer.array(), fieldStartOff, btreeRecord, recRunner, fieldLength);
+				recRunner += fieldLength;								
+			}			
+											
+			// append to btree
+			try {
+				btree.bulkLoadAddRecord(bulkLoadCtx, btreeRecord);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+	}
+	
+	@Override
+	public void open() throws HyracksDataException {		
+		RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);		
+		accessor = new FrameTupleAccessor(ctx, recDesc);
+		IBTreeMetaDataFrame metaFrame = new MetaDataFrame();		
+		try {
+			init();
+			btree.open(opDesc.getBtreeFileId());
+			bulkLoadCtx = btree.beginBulkLoad(fillFactor, leafFrame, interiorFrame, metaFrame);
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
+	
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorDescriptor.java
new file mode 100644
index 0000000..6172f60
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorDescriptor.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+public class BTreeDiskOrderScanOperatorDescriptor extends AbstractBTreeOperatorDescriptor {
+	
+	private static final long serialVersionUID = 1L;
+	
+	public BTreeDiskOrderScanOperatorDescriptor(JobSpecification spec,
+			IFileSplitProvider fileSplitProvider, RecordDescriptor recDesc,
+			IBufferCacheProvider bufferCacheProvider,
+			IBTreeRegistryProvider btreeRegistryProvider, int btreeFileId,
+			String btreeFileName, IBTreeInteriorFrameFactory interiorFactory,
+			IBTreeLeafFrameFactory leafFactory, MultiComparator cmp) {
+		super(spec, 0, 1, fileSplitProvider, recDesc, bufferCacheProvider,
+				btreeRegistryProvider, btreeFileId, btreeFileName, interiorFactory,
+				leafFactory, cmp, null);
+	}
+	
+	@Override
+	public IOperatorNodePushable createPushRuntime(IHyracksContext ctx,
+			IOperatorEnvironment env,
+			IRecordDescriptorProvider recordDescProvider, int partition,
+			int nPartitions) {
+		return new BTreeDiskOrderScanOperatorNodePushable(this, ctx);
+	}	
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
new file mode 100644
index 0000000..b54952a
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
@@ -0,0 +1,93 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.DataOutput;
+import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.DiskOrderScanCursor;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+
+public class BTreeDiskOrderScanOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
+	
+	public BTreeDiskOrderScanOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx) {
+		super(opDesc, ctx);
+	}
+	
+	@Override
+	public void open() throws HyracksDataException {		
+		
+		IBTreeLeafFrame cursorFrame = opDesc.getLeafFactory().getFrame();
+		DiskOrderScanCursor cursor = new DiskOrderScanCursor(cursorFrame);
+		IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
+		
+		try {
+			init();				
+			fill();
+			btree.diskOrderScan(cursor, cursorFrame, metaFrame);			
+		} catch (FileNotFoundException e1) {
+			e1.printStackTrace();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		
+		MultiComparator cmp = opDesc.getMultiComparator();
+		ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+		FrameTupleAppender appender = new FrameTupleAppender(ctx);
+		appender.reset(frame, true);
+		ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
+		DataOutput dos = tb.getDataOutput();
+		
+		try {
+			while(cursor.hasNext()) {
+				tb.reset();                		
+				cursor.next();
+
+				int recRunner = cursor.getOffset();
+				byte[] array = cursor.getPage().getBuffer().array();
+				for(int i = 0; i < cmp.getFields().length; i++) {
+					int fieldLen = cmp.getFields()[i].getLength(array, recRunner);
+					dos.write(array, recRunner, fieldLen);                			
+					recRunner += fieldLen;
+					tb.addFieldEndOffset();
+				}
+
+				if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+					FrameUtils.flushFrame(frame, writer);
+					appender.reset(frame, true);
+					if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+						throw new IllegalStateException();
+					}
+				}
+
+				//int recOffset = cursor.getOffset();                
+				//String rec = cmp.printRecord(array, recOffset);
+				//System.out.println(rec);
+			}
+
+			if (appender.getTupleCount() > 0) {
+				FrameUtils.flushFrame(frame, writer);
+			}
+			writer.close();
+
+		} catch (Exception e) {					
+			e.printStackTrace();
+		}
+	}
+	
+	@Override
+    public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+	
+	@Override
+	public void close() throws HyracksDataException {            	
+	}
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistry.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistry.java
new file mode 100644
index 0000000..24b133b
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistry.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.util.HashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+
+public class BTreeRegistry {
+    
+    private HashMap<Integer, BTree> map = new HashMap<Integer, BTree>();
+    private Lock registryLock = new ReentrantLock();
+    
+    public BTree get(int fileId) {
+        return map.get(fileId);        
+    }
+    
+    // TODO: not very high concurrency, but good enough for now    
+    public void lock() {
+        registryLock.lock();
+    }
+    
+    public void unlock() {
+        registryLock.unlock();
+    }
+    
+    public void register(int fileId, BTree btree) {
+        map.put(fileId, btree);
+    }
+    
+    public void unregister(int fileId) {
+        map.remove(fileId);
+    }        
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistryProvider.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistryProvider.java
new file mode 100644
index 0000000..832f8e9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeRegistryProvider.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+public class BTreeRegistryProvider implements IBTreeRegistryProvider {
+		
+	private static final long serialVersionUID = 1L;
+	
+	private static BTreeRegistry btreeRegistry = null;
+	
+	@Override
+	public BTreeRegistry getBTreeRegistry() {
+		if(btreeRegistry == null) {
+			btreeRegistry = new BTreeRegistry();
+		}		
+		return btreeRegistry;
+	}	
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
new file mode 100644
index 0000000..1e17b12
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -0,0 +1,84 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.DataOutputStream;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ByteArrayAccessibleOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
+import edu.uci.ics.hyracks.storage.am.btree.frames.NSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.NSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.btree.types.Int32Accessor;
+
+public class BTreeSearchOperatorDescriptor extends AbstractBTreeOperatorDescriptor {
+
+	private static final long serialVersionUID = 1L;
+
+	public BTreeSearchOperatorDescriptor(JobSpecification spec, IFileSplitProvider fileSplitProvider, RecordDescriptor recDesc, IBufferCacheProvider bufferCacheProvider, IBTreeRegistryProvider btreeRegistryProvider,  int btreeFileId, String btreeFileName, IBTreeInteriorFrameFactory interiorFactory, IBTreeLeafFrameFactory leafFactory, MultiComparator cmp, RangePredicate rangePred) {
+		super(spec, 0, 1, fileSplitProvider, recDesc, bufferCacheProvider, btreeRegistryProvider, btreeFileId, btreeFileName, interiorFactory, leafFactory, cmp, rangePred);        
+	}
+
+	@Override
+	public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+			IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+		return new BTreeSearchOperatorNodePushable(this, ctx);
+	}
+	
+	public static void main(String args[]) throws HyracksDataException {
+		
+		IBTreeInteriorFrameFactory interiorFrameFactory = new NSMInteriorFrameFactory();
+		IBTreeLeafFrameFactory leafFrameFactory = new NSMLeafFrameFactory();        
+		
+		IFieldAccessor[] fields = new IFieldAccessor[2];
+		fields[0] = new Int32Accessor(); // key field
+		fields[1] = new Int32Accessor(); // value field
+		
+		int keyLen = 1;
+		IBinaryComparator[] cmps = new IBinaryComparator[keyLen];
+		cmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+		MultiComparator cmp = new MultiComparator(cmps, fields);		
+
+		ByteArrayAccessibleOutputStream lkbaaos = new ByteArrayAccessibleOutputStream();
+		DataOutputStream lkdos = new DataOutputStream(lkbaaos);    	    	    	
+		IntegerSerializerDeserializer.INSTANCE.serialize(-1000, lkdos);
+
+		ByteArrayAccessibleOutputStream hkbaaos = new ByteArrayAccessibleOutputStream();
+		DataOutputStream hkdos = new DataOutputStream(hkbaaos);    	    	    	
+		IntegerSerializerDeserializer.INSTANCE.serialize(1000, hkdos);
+
+		byte[] lowKey = lkbaaos.toByteArray();
+		byte[] highKey = hkbaaos.toByteArray();
+
+		IBinaryComparator[] searchCmps = new IBinaryComparator[1];
+		searchCmps[0] = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+		MultiComparator searchCmp = new MultiComparator(searchCmps, fields);
+
+		RangePredicate rangePred = new RangePredicate(true, lowKey, highKey, searchCmp);
+
+		IBufferCacheProvider bufferCacheProvider = new BufferCacheProvider();
+		IBTreeRegistryProvider btreeRegistryProvider = new BTreeRegistryProvider();
+
+		RecordDescriptor recDesc = new RecordDescriptor(
+				new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+		JobSpecification spec = new JobSpecification();
+		BTreeSearchOperatorDescriptor opDesc = new BTreeSearchOperatorDescriptor(spec, null, recDesc, bufferCacheProvider, btreeRegistryProvider, 0, "/tmp/btreetest.bin", interiorFrameFactory, leafFrameFactory, cmp, rangePred);		 		 
+		IOperatorNodePushable op = opDesc.createPushRuntime(null, null, null, 0, 0);
+		op.open();		 
+	}
+
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
new file mode 100644
index 0000000..88d263e
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -0,0 +1,91 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.DataOutput;
+import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeCursor;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangeSearchCursor;
+
+public class BTreeSearchOperatorNodePushable extends AbstractBTreeOperatorNodePushable {
+	
+	public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksContext ctx) {
+		super(opDesc, ctx);
+	}
+		
+	@Override
+	public void open() throws HyracksDataException {		
+		
+		IBTreeLeafFrame cursorFrame = opDesc.getLeafFactory().getFrame();
+		IBTreeCursor cursor = new RangeSearchCursor(cursorFrame);
+		
+		try {
+			init();				
+			fill();
+			btree.search(cursor, opDesc.getRangePredicate(), leafFrame, interiorFrame);
+		} catch (FileNotFoundException e1) {
+			e1.printStackTrace();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		
+		MultiComparator cmp = opDesc.getMultiComparator();
+		ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+		FrameTupleAppender appender = new FrameTupleAppender(ctx);
+		appender.reset(frame, true);
+		ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFields().length);
+		DataOutput dos = tb.getDataOutput();
+		
+		try {
+			while(cursor.hasNext()) {
+				tb.reset();                		
+				cursor.next();
+
+				int recRunner = cursor.getOffset();
+				byte[] array = cursor.getPage().getBuffer().array();
+				for(int i = 0; i < cmp.getFields().length; i++) {
+					int fieldLen = cmp.getFields()[i].getLength(array, recRunner);
+					dos.write(array, recRunner, fieldLen);                			
+					recRunner += fieldLen;
+					tb.addFieldEndOffset();
+				}
+
+				if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+					FrameUtils.flushFrame(frame, writer);
+					appender.reset(frame, true);
+					if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+						throw new IllegalStateException();
+					}
+				}
+
+				//int recOffset = cursor.getOffset();                
+				//String rec = cmp.printRecord(array, recOffset);
+				//System.out.println(rec);
+			}
+
+			if (appender.getTupleCount() > 0) {
+				FrameUtils.flushFrame(frame, writer);
+			}
+			writer.close();
+
+		} catch (Exception e) {					
+			e.printStackTrace();
+		}
+	}
+	
+	@Override
+    public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+	
+	@Override
+	public void close() throws HyracksDataException {            	
+	}			
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BufferCacheProvider.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BufferCacheProvider.java
new file mode 100644
index 0000000..6881cf7
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BufferCacheProvider.java
@@ -0,0 +1,50 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.file.FileManager;
+
+public class BufferCacheProvider implements IBufferCacheProvider {
+		
+	private static final long serialVersionUID = 1L;
+	
+	private static IBufferCache bufferCache = null;	
+	private static FileManager fileManager = null;
+	private static final int PAGE_SIZE = 8192;
+    private static final int NUM_PAGES = 40;
+	
+	@Override
+	public IBufferCache getBufferCache() {
+		
+		if(bufferCache == null) {
+			if(fileManager == null) fileManager = new FileManager();			
+	        ICacheMemoryAllocator allocator = new BufferAllocator();
+	        IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
+	        bufferCache = new BufferCache(allocator, prs, fileManager, PAGE_SIZE, NUM_PAGES);
+		}
+		
+		return bufferCache;
+	}
+	
+	@Override
+	public FileManager getFileManager() {
+		if(fileManager == null) fileManager = new FileManager();
+		return fileManager;
+	}	
+	
+	public class BufferAllocator implements ICacheMemoryAllocator {
+        @Override
+        public ByteBuffer[] allocate(int pageSize, int numPages) {
+            ByteBuffer[] buffers = new ByteBuffer[numPages];
+            for (int i = 0; i < numPages; ++i) {
+                buffers[i] = ByteBuffer.allocate(pageSize);
+            }
+            return buffers;
+        }
+    }	
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeOperatorDescriptor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeOperatorDescriptor.java
new file mode 100644
index 0000000..b25aec2
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeOperatorDescriptor.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+
+public interface IBTreeOperatorDescriptor {
+	public String getBTreeFileName();
+	public int getBTreeFileId();
+	
+	public MultiComparator getMultiComparator();	
+	public RangePredicate getRangePred();
+	
+	public IBTreeInteriorFrameFactory getInteriorFactory();
+	public IBTreeLeafFrameFactory getLeafFactory();
+	
+	public IBufferCacheProvider getBufferCacheProvider();
+	public IBTreeRegistryProvider getBTreeRegistryProvider();
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeRegistryProvider.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeRegistryProvider.java
new file mode 100644
index 0000000..ec9a007
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBTreeRegistryProvider.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.Serializable;
+
+public interface IBTreeRegistryProvider extends Serializable {
+	public BTreeRegistry getBTreeRegistry();
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBufferCacheProvider.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBufferCacheProvider.java
new file mode 100644
index 0000000..6f0edc6
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/IBufferCacheProvider.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.hyracks.storage.am.btree.dataflow;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.FileManager;
+
+public interface IBufferCacheProvider extends Serializable {
+	public IBufferCache getBufferCache();
+	public FileManager getFileManager();
+}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index f1353b1..268aa4a 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -41,6 +41,8 @@
                                         // maxPage
     private final int rootPage = 1; // the root page never changes
     
+    private boolean created = false;
+    
     private final IBufferCache bufferCache;
     private int fileId;
     private final IBTreeInteriorFrameFactory interiorFrameFactory;
@@ -64,6 +66,7 @@
 
     public int usefulCompression = 0;
     public int uselessCompression = 0;
+        
     
     public String printStats() {
         StringBuilder strBuilder = new StringBuilder();
@@ -89,39 +92,56 @@
     }
     
     public void create(int fileId, IBTreeLeafFrame leafFrame, IBTreeMetaDataFrame metaFrame) throws Exception {
-        // initialize meta data page
-        ICachedPage metaNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, metaDataPage), false);
-        pins++;
-        
-        metaNode.acquireWriteLatch();
-        writeLatchesAcquired++;
-        try {
-            metaFrame.setPage(metaNode);
-            metaFrame.initBuffer((byte) -1);
-            metaFrame.setMaxPage(rootPage);
-        } finally {
-            metaNode.releaseWriteLatch();
-            writeLatchesReleased++;
-            bufferCache.unpin(metaNode);
-            unpins++;
-        }
-        
-        // initialize root page
-        ICachedPage rootNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, rootPage), true);
-        pins++;
-        
-        rootNode.acquireWriteLatch();
-        writeLatchesAcquired++;
-        try {
-            leafFrame.setPage(rootNode);
-            leafFrame.initBuffer((byte) 0);
-        } finally {
-            rootNode.releaseWriteLatch();
-            writeLatchesReleased++;            
-            bufferCache.unpin(rootNode);
-            unpins++;
-        }
-        currentLevel = 0;
+
+    	if(created) return;
+    	
+    	treeLatch.writeLock().lock();    	
+    	try {
+    	
+    		// check if another thread bet us to it
+    		if(created) return;    		
+    		
+    		// initialize meta data page
+    		ICachedPage metaNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, metaDataPage), false);
+    		pins++;
+
+    		System.out.println(metaNode.getBuffer().capacity());
+
+    		metaNode.acquireWriteLatch();
+    		writeLatchesAcquired++;
+    		try {
+    			metaFrame.setPage(metaNode);
+    			metaFrame.initBuffer((byte) -1);
+    			metaFrame.setMaxPage(rootPage);
+    		} finally {
+    			metaNode.releaseWriteLatch();
+    			writeLatchesReleased++;
+    			bufferCache.unpin(metaNode);
+    			unpins++;
+    		}
+
+    		// initialize root page
+    		ICachedPage rootNode = bufferCache.pin(FileInfo.getDiskPageId(fileId, rootPage), true);
+    		pins++;
+
+    		rootNode.acquireWriteLatch();
+    		writeLatchesAcquired++;
+    		try {
+    			leafFrame.setPage(rootNode);
+    			leafFrame.initBuffer((byte) 0);
+    		} finally {
+    			rootNode.releaseWriteLatch();
+    			writeLatchesReleased++;            
+    			bufferCache.unpin(rootNode);
+    			unpins++;
+    		}
+    		currentLevel = 0;
+    		
+    		created = true;
+    	}
+    	finally {
+    		treeLatch.writeLock().unlock();
+    	}
     }
 
     public void open(int fileId) {
@@ -1237,7 +1257,7 @@
         int spaceNeeded = record.length + ctx.slotSize;
         if (leafFrontier.bytesInserted + spaceNeeded > ctx.leafMaxBytes) {
             int splitKeySize = cmp.getKeySize(leafFrontier.lastRecord, 0);
-            ctx.splitKey.initData(splitKeySize);
+            ctx.splitKey.initData(splitKeySize);            
             System.arraycopy(leafFrontier.lastRecord, 0, ctx.splitKey.getData(), 0, splitKeySize);
             ctx.splitKey.setLeftPage(leafFrontier.pageId);
             int prevPageId = leafFrontier.pageId;
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/MultiComparator.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/MultiComparator.java
index fd8a397..243a7a2 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/MultiComparator.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/MultiComparator.java
@@ -15,11 +15,15 @@
 
 package edu.uci.ics.hyracks.storage.am.btree.impls;
 
+import java.io.Serializable;
+
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
 
-public class MultiComparator {
-    
+public class MultiComparator implements Serializable {
+    	
+	private static final long serialVersionUID = 1L;
+	
 	private IBinaryComparator[] cmps = null;
 	private IFieldAccessor[] fields = null;
 	
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangePredicate.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangePredicate.java
index ffdf9e5..68d7ad2 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangePredicate.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangePredicate.java
@@ -18,6 +18,8 @@
 import edu.uci.ics.hyracks.storage.am.btree.api.ISearchPredicate;
 
 public class RangePredicate implements ISearchPredicate {
+		
+	private static final long serialVersionUID = 1L;
 	
 	protected boolean isForward = true;
 	protected byte[] lowKeys = null;
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/Int32Accessor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/Int32Accessor.java
index 6c353cf..804b2f5 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/Int32Accessor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/Int32Accessor.java
@@ -20,7 +20,9 @@
 import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
 
 public class Int32Accessor implements IFieldAccessor {
-	    
+	
+	private static final long serialVersionUID = 1L;
+
 	@Override
 	public int getLength(byte[] data, int offset) {
 		return 4;
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/StringAccessor.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/StringAccessor.java
index 85e10f4..243c62c 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/StringAccessor.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/types/StringAccessor.java
@@ -25,21 +25,14 @@
 import edu.uci.ics.hyracks.storage.am.btree.api.IFieldAccessor;
 
 public class StringAccessor implements IFieldAccessor {
-		
+	
+	private static final long serialVersionUID = 1L;
+
 	@Override
 	public int getLength(byte[] data, int offset) {
 		return StringUtils.getUTFLen(data, offset) + 2;
 	}
-	
-	/*
-	@Override
-	public int getLength(byte[] data, int offset) {
-		// TODO: this is very inefficient. We need a getInt() method that works
-		ByteBuffer buf = ByteBuffer.wrap(data);
-		return buf.getInt(offset) + 4; // assuming the first int indicates the length
-	}
-	*/	
-			
+		
 	@Override
 	public String print(byte[] data, int offset) {				
 		ByteBuffer buf = ByteBuffer.wrap(data);