Merged -r 330:354 from trunk

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@355 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/pom.xml b/hyracks-storage-am-btree/pom.xml
index 9a0e810..68c77e2 100644
--- a/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-storage-am-btree/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-storage-am-btree</artifactId>
-  <version>0.1.4-SNAPSHOT</version>
+  <version>0.1.5-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.4-SNAPSHOT</version>
+    <version>0.1.5-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,28 +27,28 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-storage-common</artifactId>
-  		<version>0.1.4-SNAPSHOT</version>
+  		<version>0.1.5-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-common</artifactId>
-  		<version>0.1.4-SNAPSHOT</version>
+  		<version>0.1.5-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.4-SNAPSHOT</version>
+  		<version>0.1.5-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-nc</artifactId>
-  		<version>0.1.4-SNAPSHOT</version>
+  		<version>0.1.5-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>  	
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
index 1dd69ba..6e13ed7 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
@@ -26,56 +26,70 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.MetaDataFrame;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 
-public class BTreeBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
-    private float fillFactor;
-    private final BTreeOpHelper btreeOpHelper;
-    private FrameTupleAccessor accessor;
-    private BTree.BulkLoadContext bulkLoadCtx;
+public class BTreeBulkLoadOperatorNodePushable extends
+		AbstractUnaryInputSinkOperatorNodePushable {
+	private float fillFactor;
+	private final BTreeOpHelper btreeOpHelper;
+	private FrameTupleAccessor accessor;
+	private BTree.BulkLoadContext bulkLoadCtx;
 
-    private IRecordDescriptorProvider recordDescProvider;
+	private IRecordDescriptorProvider recordDescProvider;
 
-    private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+	private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
 
-    public BTreeBulkLoadOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksStageletContext ctx,
-            int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
-        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.CREATE_BTREE);
-        this.fillFactor = fillFactor;
-        this.recordDescProvider = recordDescProvider;
-        tuple.setFieldPermutation(fieldPermutation);
-    }
+	public BTreeBulkLoadOperatorNodePushable(
+			AbstractBTreeOperatorDescriptor opDesc,
+			IHyracksStageletContext ctx, int partition, int[] fieldPermutation,
+			float fillFactor, IRecordDescriptorProvider recordDescProvider) {
+		btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition,
+				BTreeOpHelper.BTreeMode.CREATE_BTREE);
+		this.fillFactor = fillFactor;
+		this.recordDescProvider = recordDescProvider;
+		tuple.setFieldPermutation(fieldPermutation);
+	}
 
-    @Override
-    public void open() throws HyracksDataException {
-        AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
-        RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
-        accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), recDesc);
-        IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
-        btreeOpHelper.init();
-        btreeOpHelper.getBTree().open(btreeOpHelper.getBTreeFileId());
-        bulkLoadCtx = btreeOpHelper.getBTree().beginBulkLoad(fillFactor, btreeOpHelper.getLeafFrame(),
-                btreeOpHelper.getInteriorFrame(), metaFrame);
-    }
+	@Override
+	public void open() throws HyracksDataException {
+		AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper
+				.getOperatorDescriptor();
+		RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(
+				opDesc.getOperatorId(), 0);
+		accessor = new FrameTupleAccessor(btreeOpHelper
+				.getHyracksStageletContext().getFrameSize(), recDesc);
+		IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
+		try {
+			btreeOpHelper.init();
+			btreeOpHelper.getBTree().open(btreeOpHelper.getBTreeFileId());
+			bulkLoadCtx = btreeOpHelper.getBTree().beginBulkLoad(fillFactor,
+					btreeOpHelper.getLeafFrame(),
+					btreeOpHelper.getInteriorFrame(), metaFrame);
+		} catch (Exception e) {
+			// cleanup in case of failure
+			btreeOpHelper.deinit();
+			throw new HyracksDataException(e);
+		}
+	}
 
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        accessor.reset(buffer);
-        int tupleCount = accessor.getTupleCount();
-        for (int i = 0; i < tupleCount; i++) {
-            tuple.reset(accessor, i);
-            btreeOpHelper.getBTree().bulkLoadAddTuple(bulkLoadCtx, tuple);
-        }
-    }
+	@Override
+	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+		accessor.reset(buffer);
+		int tupleCount = accessor.getTupleCount();
+		for (int i = 0; i < tupleCount; i++) {
+			tuple.reset(accessor, i);
+			btreeOpHelper.getBTree().bulkLoadAddTuple(bulkLoadCtx, tuple);
+		}
+	}
 
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            btreeOpHelper.getBTree().endBulkLoad(bulkLoadCtx);
-        } finally {
-            btreeOpHelper.deinit();
-        }
-    }
+	@Override
+	public void close() throws HyracksDataException {
+		try {
+			btreeOpHelper.getBTree().endBulkLoad(bulkLoadCtx);
+		} finally {
+			btreeOpHelper.deinit();
+		}
+	}
 
-    @Override
-    public void flush() throws HyracksDataException {
-    }
+	@Override
+	public void flush() throws HyracksDataException {
+	}
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
index fe71c33..c362183 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
@@ -45,50 +45,56 @@
         DiskOrderScanCursor cursor = new DiskOrderScanCursor(cursorFrame);
         IBTreeMetaDataFrame metaFrame = new MetaDataFrame();
 
-        btreeOpHelper.init();
-        btreeOpHelper.getBTree().diskOrderScan(cursor, cursorFrame, metaFrame);
-
-        MultiComparator cmp = btreeOpHelper.getBTree().getMultiComparator();
-        ByteBuffer frame = btreeOpHelper.getHyracksStageletContext().allocateFrame();
-        FrameTupleAppender appender = new FrameTupleAppender(btreeOpHelper.getHyracksStageletContext().getFrameSize());
-        appender.reset(frame, true);
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
-        DataOutput dos = tb.getDataOutput();
-
         try {
-            while (cursor.hasNext()) {
-                tb.reset();
-                cursor.next();
+        
+        	btreeOpHelper.init();
+        	
+        	try {
+        		btreeOpHelper.getBTree().diskOrderScan(cursor, cursorFrame, metaFrame);
 
-                ITupleReference frameTuple = cursor.getTuple();
-                for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-                    dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-                    tb.addFieldEndOffset();
-                }
+        		MultiComparator cmp = btreeOpHelper.getBTree().getMultiComparator();
+        		ByteBuffer frame = btreeOpHelper.getHyracksStageletContext().allocateFrame();
+        		FrameTupleAppender appender = new FrameTupleAppender(btreeOpHelper.getHyracksStageletContext().getFrameSize());
+        		appender.reset(frame, true);
+        		ArrayTupleBuilder tb = new ArrayTupleBuilder(cmp.getFieldCount());
+        		DataOutput dos = tb.getDataOutput();
 
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    FrameUtils.flushFrame(frame, writer);
-                    appender.reset(frame, true);
-                    if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                        throw new IllegalStateException();
-                    }
-                }
-            }
+        		while (cursor.hasNext()) {
+        			tb.reset();
+        			cursor.next();
 
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(frame, writer);
-            }
+        			ITupleReference frameTuple = cursor.getTuple();
+        			for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+        				dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+        				tb.addFieldEndOffset();
+        			}
 
-            cursor.close();
-            writer.close();
+        			if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+        				FrameUtils.flushFrame(frame, writer);
+        				appender.reset(frame, true);
+        				if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+        					throw new IllegalStateException();
+        				}
+        			}
+        		}
 
-        } catch (Exception e) {
-            e.printStackTrace();
+        		if (appender.getTupleCount() > 0) {
+        			FrameUtils.flushFrame(frame, writer);
+        		}
+        	}
+        	finally {
+        		cursor.close();
+        		writer.close();
+        	}
+
+        } catch(Exception e) {
+        	deinitialize();
+        	throw new HyracksDataException(e);
         }
     }
 
     @Override
     public void deinitialize() throws HyracksDataException {
-        btreeOpHelper.deinit();
+    	btreeOpHelper.deinit();
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java
index 23a6601..c087fdd 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDropOperatorNodePushable.java
@@ -15,6 +15,9 @@
 
 package edu.uci.ics.hyracks.storage.am.btree.dataflow;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -27,7 +30,9 @@
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 public class BTreeDropOperatorNodePushable extends AbstractOperatorNodePushable {
-    private final IHyracksStageletContext ctx;
+	private static final Logger LOGGER = Logger.getLogger(BTreeDropOperatorNodePushable.class.getName());
+	
+	private final IHyracksStageletContext ctx;
     private IBTreeRegistryProvider btreeRegistryProvider;
     private IStorageManagerInterface storageManager;
     private IFileSplitProvider fileSplitProvider;
@@ -58,30 +63,39 @@
 
     @Override
     public void initialize() throws HyracksDataException {
+    	try {
 
-        BTreeRegistry btreeRegistry = btreeRegistryProvider.getBTreeRegistry(ctx);
-        IBufferCache bufferCache = storageManager.getBufferCache(ctx);
-        IFileMapProvider fileMapProvider = storageManager.getFileMapProvider(ctx);
+    		BTreeRegistry btreeRegistry = btreeRegistryProvider.getBTreeRegistry(ctx);
+    		IBufferCache bufferCache = storageManager.getBufferCache(ctx);
+    		IFileMapProvider fileMapProvider = storageManager.getFileMapProvider(ctx);
 
-        FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
+    		FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
 
-        boolean fileIsMapped = fileMapProvider.isMapped(f);
-        if (!fileIsMapped) {
-            throw new HyracksDataException("Cannot drop B-Tree with name " + f.toString() + ". No file mapping exists.");
-        }
+    		boolean fileIsMapped = fileMapProvider.isMapped(f);
+    		if (!fileIsMapped) {    			    			
+    			throw new HyracksDataException("Cannot drop B-Tree with name " + f.toString() + ". No file mapping exists.");
+    		}
 
-        int btreeFileId = fileMapProvider.lookupFileId(f);
+    		int btreeFileId = fileMapProvider.lookupFileId(f);
 
-        // unregister btree instance
-        btreeRegistry.lock();
-        try {
-            btreeRegistry.unregister(btreeFileId);
-        } finally {
-            btreeRegistry.unlock();
-        }
+    		// unregister btree instance
+    		btreeRegistry.lock();
+    		try {
+    			btreeRegistry.unregister(btreeFileId);
+    		} finally {
+    			btreeRegistry.unlock();
+    		}
 
-        // remove name to id mapping
-        bufferCache.deleteFile(btreeFileId);
+    		// remove name to id mapping
+    		bufferCache.deleteFile(btreeFileId);
+    	}
+    	// TODO: for the time being we don't throw,
+		// with proper exception handling (no hanging job problem) we should throw
+    	catch (Exception e) {
+    		if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("BTRee Drop Operator Failed Due To Exception: " + e.getMessage());
+            }
+    	}
     }
 
     @Override
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java
index 7557573..023cd40 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeFileEnlistmentOperatorNodePushable.java
@@ -46,8 +46,12 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        btreeOpHelper.init();
-        btreeOpHelper.deinit();
+        try {
+        	btreeOpHelper.init();
+        }
+        finally {
+        	btreeOpHelper.deinit();
+        }
     }
 
     @Override
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
index e6ba012..acb7d0f 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertUpdateDeleteOperatorNodePushable.java
@@ -48,14 +48,20 @@
 
     @Override
     public void open() throws HyracksDataException {
-        AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
-        RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
-        accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), inputRecDesc);
-        writeBuffer = btreeOpHelper.getHyracksStageletContext().allocateFrame();
-        btreeOpHelper.init();
-        btreeOpHelper.getBTree().open(btreeOpHelper.getBTreeFileId());
-        opCtx = btreeOpHelper.getBTree().createOpContext(op, btreeOpHelper.getLeafFrame(),
-                btreeOpHelper.getInteriorFrame(), new MetaDataFrame());
+    	AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
+    	RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+    	accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), inputRecDesc);
+    	writeBuffer = btreeOpHelper.getHyracksStageletContext().allocateFrame();
+    	try {
+    		btreeOpHelper.init();
+    		btreeOpHelper.getBTree().open(btreeOpHelper.getBTreeFileId());
+    		opCtx = btreeOpHelper.getBTree().createOpContext(op, btreeOpHelper.getLeafFrame(),
+    				btreeOpHelper.getInteriorFrame(), new MetaDataFrame());
+    	} catch(Exception e) {
+    		// cleanup in case of failure
+    		btreeOpHelper.deinit();
+    		throw new HyracksDataException(e);
+    	}
     }
 
     @Override
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
index d01ceee..880cc25 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeOpHelper.java
@@ -29,23 +29,21 @@
 
 final class BTreeOpHelper {
 
-    public enum BTreeMode {
-        OPEN_BTREE,
-        CREATE_BTREE,
-        ENLIST_BTREE
-    }
+	public enum BTreeMode {
+		OPEN_BTREE, CREATE_BTREE, ENLIST_BTREE
+	}
+	
+	private IBTreeInteriorFrame interiorFrame;
+	private IBTreeLeafFrame leafFrame;
 
-    private IBTreeInteriorFrame interiorFrame;
-    private IBTreeLeafFrame leafFrame;
-
-    private BTree btree;
-    private int btreeFileId = -1;
-    private int partition;
+	private BTree btree;
+	private int btreeFileId = -1;
+	private int partition;
 
     private AbstractBTreeOperatorDescriptor opDesc;
     private IHyracksStageletContext ctx;
 
-    private BTreeMode mode;
+	private BTreeMode mode;
 
     BTreeOpHelper(AbstractBTreeOperatorDescriptor opDesc, final IHyracksStageletContext ctx, int partition,
             BTreeMode mode) {
@@ -55,7 +53,7 @@
         this.partition = partition;
     }
 
-    void init() throws HyracksDataException {
+	void init() throws HyracksDataException {
         IBufferCache bufferCache = opDesc.getStorageManager().getBufferCache(ctx);
         IFileMapProvider fileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
         IFileSplitProvider fileSplitProvider = opDesc.getFileSplitProvider();
@@ -63,70 +61,72 @@
         FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
         boolean fileIsMapped = fileMapProvider.isMapped(f);
 
-        switch (mode) {
+		switch (mode) {
+		
+		case OPEN_BTREE: {
+			if (!fileIsMapped) {
+				throw new HyracksDataException(
+						"Trying to open btree from unmapped file " + f.toString());
+			}
+		}
+		break;
 
-            case OPEN_BTREE: {
-                if (!fileIsMapped) {
-                    bufferCache.createFile(f);
-                    // throw new
-                    // HyracksDataException("Trying to open btree from unmapped file "
-                    // + fileName);
-                }
-            }
-                break;
-
-            case CREATE_BTREE:
-            case ENLIST_BTREE: {
-                if (!fileIsMapped) {
-                    bufferCache.createFile(f);
-                }
-            }
-                break;
-
-        }
+		case CREATE_BTREE:
+		case ENLIST_BTREE: {
+			if (!fileIsMapped) {
+				bufferCache.createFile(f);
+			}
+		}
+		break;
+		
+		}
 
         btreeFileId = fileMapProvider.lookupFileId(f);
         bufferCache.openFile(btreeFileId);
 
-        interiorFrame = opDesc.getInteriorFactory().getFrame();
-        leafFrame = opDesc.getLeafFactory().getFrame();
+		interiorFrame = opDesc.getInteriorFactory().getFrame();
+		leafFrame = opDesc.getLeafFactory().getFrame();
 
         BTreeRegistry btreeRegistry = opDesc.getBtreeRegistryProvider().getBTreeRegistry(ctx);
         btree = btreeRegistry.get(btreeFileId);
         if (btree == null) {
 
-            // create new btree and register it
-            btreeRegistry.lock();
-            try {
-                // check if btree has already been registered by another thread
-                btree = btreeRegistry.get(btreeFileId);
-                if (btree == null) {
-                    // this thread should create and register the btree
+			// create new btree and register it
+			btreeRegistry.lock();
+			try {
+				// check if btree has already been registered by another thread
+				btree = btreeRegistry.get(btreeFileId);
+				if (btree == null) {
+					// this thread should create and register the btree
 
-                    IBinaryComparator[] comparators = new IBinaryComparator[opDesc.getComparatorFactories().length];
-                    for (int i = 0; i < opDesc.getComparatorFactories().length; i++) {
-                        comparators[i] = opDesc.getComparatorFactories()[i].createBinaryComparator();
-                    }
+					IBinaryComparator[] comparators = new IBinaryComparator[opDesc
+							.getComparatorFactories().length];
+					for (int i = 0; i < opDesc.getComparatorFactories().length; i++) {
+						comparators[i] = opDesc.getComparatorFactories()[i]
+								.createBinaryComparator();
+					}
 
-                    MultiComparator cmp = new MultiComparator(opDesc.getTypeTraits(), comparators);
+					MultiComparator cmp = new MultiComparator(opDesc
+							.getTypeTraits(), comparators);
 
-                    btree = new BTree(bufferCache, opDesc.getInteriorFactory(), opDesc.getLeafFactory(), cmp);
-                    if (mode == BTreeMode.CREATE_BTREE) {
-                        MetaDataFrame metaFrame = new MetaDataFrame();
-                        try {
-                            btree.create(btreeFileId, leafFrame, metaFrame);
-                            btree.open(btreeFileId);
-                        } catch (Exception e) {
-                            throw new HyracksDataException(e);
-                        }
-                    }
-                    btreeRegistry.register(btreeFileId, btree);
-                }
-            } finally {
-                btreeRegistry.unlock();
-            }
-        }
-    }
+					btree = new BTree(bufferCache, opDesc.getInteriorFactory(),
+							opDesc.getLeafFactory(), cmp);
+					if (mode == BTreeMode.CREATE_BTREE) {
+						MetaDataFrame metaFrame = new MetaDataFrame();
+						try {
+							btree.create(btreeFileId, leafFrame, metaFrame);
+							btree.open(btreeFileId);
+						} catch (Exception e) {
+							throw new HyracksDataException(e);
+						}
+					}
+					btreeRegistry.register(btreeFileId, btree);
+				}
+			} finally {
+				btreeRegistry.unlock();
+			}
+		}
+	}
 
     public void deinit() throws HyracksDataException {
         if (btreeFileId != -1) {
@@ -135,27 +135,27 @@
         }
     }
 
-    public BTree getBTree() {
-        return btree;
-    }
+	public BTree getBTree() {
+		return btree;
+	}
 
     public IHyracksStageletContext getHyracksStageletContext() {
         return ctx;
     }
 
-    public AbstractBTreeOperatorDescriptor getOperatorDescriptor() {
-        return opDesc;
-    }
+	public AbstractBTreeOperatorDescriptor getOperatorDescriptor() {
+		return opDesc;
+	}
 
-    public IBTreeLeafFrame getLeafFrame() {
-        return leafFrame;
-    }
+	public IBTreeLeafFrame getLeafFrame() {
+		return leafFrame;
+	}
 
-    public IBTreeInteriorFrame getInteriorFrame() {
-        return interiorFrame;
-    }
+	public IBTreeInteriorFrame getInteriorFrame() {
+		return interiorFrame;
+	}
 
-    public int getBTreeFileId() {
-        return btreeFileId;
-    }
+	public int getBTreeFileId() {
+		return btreeFileId;
+	}
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index eeee17d..415f169 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -37,165 +37,194 @@
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangeSearchCursor;
 
-public class BTreeSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-    private BTreeOpHelper btreeOpHelper;
-    private FrameTupleAccessor accessor;
+public class BTreeSearchOperatorNodePushable extends
+		AbstractUnaryInputUnaryOutputOperatorNodePushable {
+	private BTreeOpHelper btreeOpHelper;
+	private FrameTupleAccessor accessor;
 
-    private ByteBuffer writeBuffer;
-    private FrameTupleAppender appender;
-    private ArrayTupleBuilder tb;
-    private DataOutput dos;
+	private ByteBuffer writeBuffer;
+	private FrameTupleAppender appender;
+	private ArrayTupleBuilder tb;
+	private DataOutput dos;
 
-    private BTree btree;
-    private boolean isForward;
-    private PermutingFrameTupleReference lowKey;
-    private PermutingFrameTupleReference highKey;
-    private boolean lowKeyInclusive;
-    private boolean highKeyInclusive;
-    private RangePredicate rangePred;
-    private MultiComparator lowKeySearchCmp;
-    private MultiComparator highKeySearchCmp;
-    private IBTreeCursor cursor;
-    private IBTreeLeafFrame cursorFrame;
-    private BTreeOpContext opCtx;
+	private BTree btree;
+	private boolean isForward;
+	private PermutingFrameTupleReference lowKey;
+	private PermutingFrameTupleReference highKey;
+	private boolean lowKeyInclusive;
+	private boolean highKeyInclusive;
+	private RangePredicate rangePred;
+	private MultiComparator lowKeySearchCmp;
+	private MultiComparator highKeySearchCmp;
+	private IBTreeCursor cursor;
+	private IBTreeLeafFrame cursorFrame;
+	private BTreeOpContext opCtx;
 
-    private RecordDescriptor recDesc;
+	private RecordDescriptor recDesc;
 
-    public BTreeSearchOperatorNodePushable(AbstractBTreeOperatorDescriptor opDesc, IHyracksStageletContext ctx,
-            int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward, int[] lowKeyFields,
-            int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) {
-        btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition, BTreeOpHelper.BTreeMode.OPEN_BTREE);
-        this.isForward = isForward;
-        this.lowKeyInclusive = lowKeyInclusive;
-        this.highKeyInclusive = highKeyInclusive;
-        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
-        if (lowKeyFields != null && lowKeyFields.length > 0) {
-            lowKey = new PermutingFrameTupleReference();
-            lowKey.setFieldPermutation(lowKeyFields);
-        }
-        if (highKeyFields != null && highKeyFields.length > 0) {
-            highKey = new PermutingFrameTupleReference();
-            highKey.setFieldPermutation(highKeyFields);
-        }
-    }
+	public BTreeSearchOperatorNodePushable(
+			AbstractBTreeOperatorDescriptor opDesc,
+			IHyracksStageletContext ctx, int partition,
+			IRecordDescriptorProvider recordDescProvider, boolean isForward,
+			int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
+			boolean highKeyInclusive) {
+		btreeOpHelper = new BTreeOpHelper(opDesc, ctx, partition,
+				BTreeOpHelper.BTreeMode.OPEN_BTREE);
+		this.isForward = isForward;
+		this.lowKeyInclusive = lowKeyInclusive;
+		this.highKeyInclusive = highKeyInclusive;
+		this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc
+				.getOperatorId(), 0);
+		if (lowKeyFields != null && lowKeyFields.length > 0) {
+			lowKey = new PermutingFrameTupleReference();
+			lowKey.setFieldPermutation(lowKeyFields);
+		}
+		if (highKeyFields != null && highKeyFields.length > 0) {
+			highKey = new PermutingFrameTupleReference();
+			highKey.setFieldPermutation(highKeyFields);
+		}
+	}
 
-    @Override
-    public void open() throws HyracksDataException {
-        AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper.getOperatorDescriptor();
-        accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), recDesc);
+	@Override
+	public void open() throws HyracksDataException {
+		AbstractBTreeOperatorDescriptor opDesc = btreeOpHelper
+				.getOperatorDescriptor();
+		accessor = new FrameTupleAccessor(btreeOpHelper
+				.getHyracksStageletContext().getFrameSize(), recDesc);
 
-        cursorFrame = opDesc.getLeafFactory().getFrame();
-        cursor = new RangeSearchCursor(cursorFrame);
+		cursorFrame = opDesc.getLeafFactory().getFrame();
+		cursor = new RangeSearchCursor(cursorFrame);
 
-        btreeOpHelper.init();
-        btree = btreeOpHelper.getBTree();
+		try {
 
-        // construct range predicate
+			btreeOpHelper.init();
+			btree = btreeOpHelper.getBTree();
 
-        int lowKeySearchFields = btree.getMultiComparator().getComparators().length;
-        int highKeySearchFields = btree.getMultiComparator().getComparators().length;
-        if (lowKey != null)
-            lowKeySearchFields = lowKey.getFieldCount();
-        if (highKey != null)
-            highKeySearchFields = highKey.getFieldCount();
+			// construct range predicate
 
-        IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
-        for (int i = 0; i < lowKeySearchFields; i++) {
-            lowKeySearchComparators[i] = btree.getMultiComparator().getComparators()[i];
-        }
-        lowKeySearchCmp = new MultiComparator(btree.getMultiComparator().getTypeTraits(), lowKeySearchComparators);
+			int lowKeySearchFields = btree.getMultiComparator()
+					.getComparators().length;
+			int highKeySearchFields = btree.getMultiComparator()
+					.getComparators().length;
+			if (lowKey != null)
+				lowKeySearchFields = lowKey.getFieldCount();
+			if (highKey != null)
+				highKeySearchFields = highKey.getFieldCount();
 
-        if (lowKeySearchFields == highKeySearchFields) {
-            highKeySearchCmp = lowKeySearchCmp;
-        } else {
-            IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
-            for (int i = 0; i < highKeySearchFields; i++) {
-                highKeySearchComparators[i] = btree.getMultiComparator().getComparators()[i];
-            }
-            highKeySearchCmp = new MultiComparator(btree.getMultiComparator().getTypeTraits(), highKeySearchComparators);
+			IBinaryComparator[] lowKeySearchComparators = new IBinaryComparator[lowKeySearchFields];
+			for (int i = 0; i < lowKeySearchFields; i++) {
+				lowKeySearchComparators[i] = btree.getMultiComparator()
+						.getComparators()[i];
+			}
+			lowKeySearchCmp = new MultiComparator(btree.getMultiComparator()
+					.getTypeTraits(), lowKeySearchComparators);
 
-        }
+			if (lowKeySearchFields == highKeySearchFields) {
+				highKeySearchCmp = lowKeySearchCmp;
+			} else {
+				IBinaryComparator[] highKeySearchComparators = new IBinaryComparator[highKeySearchFields];
+				for (int i = 0; i < highKeySearchFields; i++) {
+					highKeySearchComparators[i] = btree.getMultiComparator()
+							.getComparators()[i];
+				}
+				highKeySearchCmp = new MultiComparator(btree
+						.getMultiComparator().getTypeTraits(),
+						highKeySearchComparators);
 
-        rangePred = new RangePredicate(isForward, null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
-                highKeySearchCmp);
+			}
 
-        accessor = new FrameTupleAccessor(btreeOpHelper.getHyracksStageletContext().getFrameSize(), recDesc);
+			rangePred = new RangePredicate(isForward, null, null,
+					lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
+					highKeySearchCmp);
 
-        writeBuffer = btreeOpHelper.getHyracksStageletContext().allocateFrame();
-        tb = new ArrayTupleBuilder(btree.getMultiComparator().getFieldCount());
-        dos = tb.getDataOutput();
-        appender = new FrameTupleAppender(btreeOpHelper.getHyracksStageletContext().getFrameSize());
-        appender.reset(writeBuffer, true);
+			accessor = new FrameTupleAccessor(btreeOpHelper
+					.getHyracksStageletContext().getFrameSize(), recDesc);
 
-        opCtx = btree.createOpContext(BTreeOp.BTO_SEARCH, btreeOpHelper.getLeafFrame(),
-                btreeOpHelper.getInteriorFrame(), null);
-    }
+			writeBuffer = btreeOpHelper.getHyracksStageletContext()
+					.allocateFrame();
+			tb = new ArrayTupleBuilder(btree.getMultiComparator()
+					.getFieldCount());
+			dos = tb.getDataOutput();
+			appender = new FrameTupleAppender(btreeOpHelper
+					.getHyracksStageletContext().getFrameSize());
+			appender.reset(writeBuffer, true);
 
-    private void writeSearchResults() throws Exception {
-        while (cursor.hasNext()) {
-            tb.reset();
-            cursor.next();
+			opCtx = btree.createOpContext(BTreeOp.BTO_SEARCH, btreeOpHelper
+					.getLeafFrame(), btreeOpHelper.getInteriorFrame(), null);
 
-            ITupleReference frameTuple = cursor.getTuple();
-            for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-                dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-                tb.addFieldEndOffset();
-            }
+		} catch (Exception e) {
+			btreeOpHelper.deinit();
+		}
+	}
 
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-                appender.reset(writeBuffer, true);
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    throw new IllegalStateException();
-                }
-            }
-        }
-    }
+	private void writeSearchResults() throws Exception {
+		while (cursor.hasNext()) {
+			tb.reset();
+			cursor.next();
 
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        accessor.reset(buffer);
+			ITupleReference frameTuple = cursor.getTuple();
+			for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+				dos.write(frameTuple.getFieldData(i), frameTuple
+						.getFieldStart(i), frameTuple.getFieldLength(i));
+				tb.addFieldEndOffset();
+			}
 
-        int tupleCount = accessor.getTupleCount();
-        try {
-            for (int i = 0; i < tupleCount; i++) {
-                if (lowKey != null)
-                    lowKey.reset(accessor, i);
-                if (highKey != null)
-                    highKey.reset(accessor, i);
-                rangePred.setLowKey(lowKey, lowKeyInclusive);
-                rangePred.setHighKey(highKey, highKeyInclusive);
+			if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0,
+					tb.getSize())) {
+				FrameUtils.flushFrame(writeBuffer, writer);
+				appender.reset(writeBuffer, true);
+				if (!appender.append(tb.getFieldEndOffsets(),
+						tb.getByteArray(), 0, tb.getSize())) {
+					throw new IllegalStateException();
+				}
+			}
+		}
+	}
 
-                cursor.reset();
-                btree.search(cursor, rangePred, opCtx);
-                writeSearchResults();
-            }
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
+	@Override
+	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+		accessor.reset(buffer);
 
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-            }
-            writer.close();
-            try {
-                cursor.close();
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
-        } finally {
-            btreeOpHelper.deinit();
-        }
-    }
+		int tupleCount = accessor.getTupleCount();
+		try {
+			for (int i = 0; i < tupleCount; i++) {
+				if (lowKey != null)
+					lowKey.reset(accessor, i);
+				if (highKey != null)
+					highKey.reset(accessor, i);
+				rangePred.setLowKey(lowKey, lowKeyInclusive);
+				rangePred.setHighKey(highKey, highKeyInclusive);
 
-    @Override
-    public void flush() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            FrameUtils.flushFrame(writeBuffer, writer);
-        }
-    }
+				cursor.reset();
+				btree.search(cursor, rangePred, opCtx);
+				writeSearchResults();
+			}
+		} catch (Exception e) {
+			throw new HyracksDataException(e);
+		}
+	}
+
+	@Override
+	public void close() throws HyracksDataException {
+		try {
+			if (appender.getTupleCount() > 0) {
+				FrameUtils.flushFrame(writeBuffer, writer);
+			}
+			writer.close();
+			try {
+				cursor.close();
+			} catch (Exception e) {
+				throw new HyracksDataException(e);
+			}
+		} finally {
+			btreeOpHelper.deinit();
+		}
+	}
+
+	@Override
+	public void flush() throws HyracksDataException {
+		if (appender.getTupleCount() > 0) {
+			FrameUtils.flushFrame(writeBuffer, writer);
+		}
+	}
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangeSearchCursor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangeSearchCursor.java
index 433cc6a..2ef1905 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangeSearchCursor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/RangeSearchCursor.java
@@ -60,8 +60,10 @@
         if (page != null) {
             page.releaseReadLatch();
             bufferCache.unpin(page);
-            page = null;
         }
+        tupleIndex = 0;
+		page = null;
+		pred = null;
     }
 
     public ITupleReference getTuple() {
@@ -217,9 +219,11 @@
 
     @Override
     public void reset() {
-        tupleIndex = 0;
-        page = null;
-        pred = null;
+        try {
+			close();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}		
     }
 
     @Override