more dataflow helper changes and cleanup to ease the introduction of index lifecycle management

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1711 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
index eaf1be1..f307c8a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
@@ -30,12 +30,11 @@
     }
 
     @Override
-    public ITreeIndex createIndexInstance() throws HyracksDataException {
+    public ITreeIndex getIndexInstance() throws HyracksDataException {
         try {
             return BTreeUtils.createBTree(opDesc.getStorageManager().getBufferCache(ctx), opDesc.getStorageManager()
                     .getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(), treeOpDesc
-                    .getTreeIndexComparatorFactories(), BTreeLeafFrameType.REGULAR_NSM, opDesc.getFileSplitProvider()
-                    .getFileSplits()[partition].getLocalFile());
+                    .getTreeIndexComparatorFactories(), BTreeLeafFrameType.REGULAR_NSM, file);
         } catch (BTreeException e) {
             throw new HyracksDataException(e);
         }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexDataflowHelper.java
new file mode 100644
index 0000000..92cf775
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexDataflowHelper.java
@@ -0,0 +1,13 @@
+package edu.uci.ics.hyracks.storage.am.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+
+public interface IIndexDataflowHelper {
+    public IIndex getIndexInstance() throws HyracksDataException;
+
+    public FileReference getFileReference();
+
+    public long getResourceID();
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
index ddca470..71760c9 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
@@ -18,8 +18,9 @@
 import java.io.Serializable;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
 
 public interface IIndexDataflowHelperFactory extends Serializable {
-    public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc,
+    public IIndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc,
             final IHyracksTaskContext ctx, int partition);
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 3d29b06..3b9a356 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -17,61 +17,59 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.common.file.IIndexArtifactMap;
 
-public abstract class IndexDataflowHelper {
-    protected final int partition;
+public abstract class IndexDataflowHelper implements IIndexDataflowHelper {
+
     protected final IIndexOperatorDescriptor opDesc;
     protected final IHyracksTaskContext ctx;
     protected final IIndexArtifactMap indexArtifactMap;
+    protected final IndexRegistry<IIndex> indexRegistry;
+    protected final FileReference file;
 
     protected IIndex index;
-    protected long resourceID = -1;
+    protected String baseDir;
+    protected String fullDir;
 
     public IndexDataflowHelper(IIndexOperatorDescriptor opDesc, final IHyracksTaskContext ctx, int partition) {
         this.opDesc = opDesc;
         this.ctx = ctx;
-        this.partition = partition;
         this.indexArtifactMap = opDesc.getStorageManager().getIndexArtifactMap(ctx);
-    }
+        this.indexRegistry = opDesc.getIndexRegistryProvider().getRegistry(ctx);
+        this.file = opDesc.getFileSplitProvider().getFileSplits()[partition].getLocalFile();
 
-    public void init(boolean forceCreate) throws HyracksDataException {
-        IndexRegistry<IIndex> indexRegistry = opDesc.getIndexRegistryProvider().getRegistry(ctx);
-        FileReference file = getFilereference();
-        List<IODeviceHandle> ioDeviceHandles = ctx.getIOManager().getIODevices();
-        String fullDir = ioDeviceHandles.get(0).getPath().toString();
+        fullDir = ctx.getIOManager().getIODevices().get(0).getPath().toString();
         if (!fullDir.endsWith(File.separator)) {
             fullDir += File.separator;
         }
-        String baseDir = file.getFile().getPath();
+        baseDir = file.getFile().getPath();
         if (!baseDir.endsWith(File.separator)) {
             baseDir += File.separator;
         }
         fullDir += baseDir;
+    }
 
-        resourceID = indexArtifactMap.get(fullDir);
-
+    public void init(boolean forceCreate) throws HyracksDataException {
+        long resourceID = getResourceID();
         // Create new index instance and register it.
         synchronized (indexRegistry) {
             // Check if the index has already been registered.
             boolean register = false;
             index = indexRegistry.get(resourceID);
             if (index == null) {
-                index = createIndexInstance();
+                index = getIndexInstance();
                 register = true;
             }
             if (forceCreate) {
                 index.create();
                 // Create new resourceId
                 try {
-                    resourceID = indexArtifactMap.create(baseDir, ioDeviceHandles);
+                    resourceID = indexArtifactMap.create(baseDir, ctx.getIOManager().getIODevices());
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 }
@@ -83,29 +81,18 @@
         }
     }
 
-    public abstract IIndex createIndexInstance() throws HyracksDataException;
+    public abstract IIndex getIndexInstance() throws HyracksDataException;
 
-    public FileReference getFilereference() {
-        IFileSplitProvider fileSplitProvider = opDesc.getFileSplitProvider();
-        return fileSplitProvider.getFileSplits()[partition].getLocalFile();
-    }
-
-    public void deinit() throws HyracksDataException {
+    @Override
+    public FileReference getFileReference() {
+        return file;
     }
 
     public IIndex getIndex() {
         return index;
     }
 
-    public IHyracksTaskContext getHyracksTaskContext() {
-        return ctx;
-    }
-
-    public IIndexOperatorDescriptor getOperatorDescriptor() {
-        return opDesc;
-    }
-
     public long getResourceID() {
-        return resourceID;
+        return indexArtifactMap.get(fullDir);
     }
 }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
index bb0689a..1289165 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
@@ -26,6 +26,8 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 
 public class TreeIndexBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+    private final AbstractTreeIndexOperatorDescriptor opDesc;
+    private final IHyracksTaskContext ctx;
     private float fillFactor;
     private final TreeIndexDataflowHelper treeIndexHelper;
     private FrameTupleAccessor accessor;
@@ -38,8 +40,10 @@
 
     public TreeIndexBulkLoadOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
-        treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
-                opDesc, ctx, partition);
+        this.opDesc = opDesc;
+        this.ctx = ctx;
+        this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
+                .createIndexDataflowHelper(opDesc, ctx, partition);
         this.fillFactor = fillFactor;
         this.recordDescProvider = recordDescProvider;
         tuple.setFieldPermutation(fieldPermutation);
@@ -47,18 +51,14 @@
 
     @Override
     public void open() throws HyracksDataException {
-        AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexHelper
-                .getOperatorDescriptor();
         RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
-        accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        accessor = new FrameTupleAccessor(ctx.getFrameSize(), recDesc);
         try {
             treeIndexHelper.init(false);
             treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
             bulkLoader = treeIndex.createBulkLoader(fillFactor);
         } catch (Exception e) {
             // cleanup in case of failure
-            System.out.println("help");
-            treeIndexHelper.deinit();
             throw new HyracksDataException(e);
         }
     }
@@ -79,8 +79,6 @@
             bulkLoader.end();
         } catch (Exception e) {
             throw new HyracksDataException(e);
-        } finally {
-            treeIndexHelper.deinit();
         }
     }
 
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexCreateOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexCreateOperatorNodePushable.java
index 21348a0..8ad22db 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexCreateOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexCreateOperatorNodePushable.java
@@ -46,11 +46,7 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        try {
-        	treeIndexHelper.init(true);
-        } finally {
-        	treeIndexHelper.deinit();
-        }
+        treeIndexHelper.init(true);
     }
 
     @Override
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
index 10d1077..6548261 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
@@ -17,7 +17,6 @@
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.impls.TreeDiskOrderScanCursor;
@@ -30,7 +29,7 @@
         this.treeOpDesc = (ITreeIndexOperatorDescriptor) opDesc;
     }
 
-    public abstract ITreeIndex createIndexInstance() throws HyracksDataException;
+    public abstract IIndex getIndexInstance() throws HyracksDataException;
 
     public ITreeIndexCursor createDiskOrderScanCursor(ITreeIndexFrame leafFrame) throws HyracksDataException {
         return new TreeDiskOrderScanCursor(leafFrame);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 3c67fcd..8ffe1df 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -33,12 +33,14 @@
 
 public class TreeIndexDiskOrderScanOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
     private final AbstractTreeIndexOperatorDescriptor opDesc;
+    private final IHyracksTaskContext ctx;
     private final TreeIndexDataflowHelper treeIndexHelper;
     private ITreeIndex treeIndex;
 
     public TreeIndexDiskOrderScanOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition) {
         this.opDesc = opDesc;
+        this.ctx = ctx;
         this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
                 .createIndexDataflowHelper(opDesc, ctx, partition);
     }
@@ -58,9 +60,8 @@
             try {
                 indexAccessor.diskOrderScan(cursor);
                 int fieldCount = treeIndex.getFieldCount();
-                ByteBuffer frame = treeIndexHelper.getHyracksTaskContext().allocateFrame();
-                FrameTupleAppender appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext()
-                        .getFrameSize());
+                ByteBuffer frame = ctx.allocateFrame();
+                FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 appender.reset(frame, true);
                 ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
                 DataOutput dos = tb.getDataOutput();
@@ -101,6 +102,5 @@
 
     @Override
     public void deinitialize() throws HyracksDataException {
-        treeIndexHelper.deinit();
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index d7a4b99..07a2726 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -33,6 +33,8 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 
 public class TreeIndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private final AbstractTreeIndexOperatorDescriptor opDesc;
+    private final IHyracksTaskContext ctx;
     private final TreeIndexDataflowHelper treeIndexHelper;
     private FrameTupleAccessor accessor;
     private final IRecordDescriptorProvider recordDescProvider;
@@ -47,6 +49,8 @@
     public TreeIndexInsertUpdateDeleteOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
             IRecordDescriptorProvider recordDescProvider, IndexOp op) {
+        this.opDesc = opDesc;
+        this.ctx = ctx;
         this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
                 .createIndexDataflowHelper(opDesc, ctx, partition);
         this.recordDescProvider = recordDescProvider;
@@ -56,11 +60,9 @@
 
     @Override
     public void open() throws HyracksDataException {
-        AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexHelper
-                .getOperatorDescriptor();
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
-        accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), inputRecDesc);
-        writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
+        accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+        writeBuffer = ctx.allocateFrame();
         writer.open();
         try {
             treeIndexHelper.init(false);
@@ -74,8 +76,6 @@
                 frameTuple = new FrameTupleReference();
             }
         } catch (Exception e) {
-            // cleanup in case of failure
-            treeIndexHelper.deinit();
             throw new HyracksDataException(e);
         }
     }
@@ -130,11 +130,7 @@
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            writer.close();
-        } finally {
-            treeIndexHelper.deinit();
-        }
+        writer.close();
     }
 
     @Override
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
index a49e9b3..3451d08 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
@@ -38,6 +38,7 @@
 
 public abstract class TreeIndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
     protected final AbstractTreeIndexOperatorDescriptor opDesc;
+    protected final IHyracksTaskContext ctx;
     protected final TreeIndexDataflowHelper treeIndexHelper;
     protected FrameTupleAccessor accessor;
 
@@ -59,9 +60,10 @@
     public TreeIndexSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, IRecordDescriptorProvider recordDescProvider) {
         this.opDesc = opDesc;
+        this.ctx = ctx;
         this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
                 .createIndexDataflowHelper(opDesc, ctx, partition);
-        this.retainInput = treeIndexHelper.getOperatorDescriptor().getRetainInput();
+        this.retainInput = opDesc.getRetainInput();
         this.inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
     }
 
@@ -75,17 +77,17 @@
 
     @Override
     public void open() throws HyracksDataException {
-        accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), inputRecDesc);
+        accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
         writer.open();
         try {
             treeIndexHelper.init(false);
             treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
             cursorFrame = treeIndex.getLeafFrameFactory().createFrame();
             searchPred = createSearchPredicate();
-            writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
+            writeBuffer = ctx.allocateFrame();
             tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
             dos = tb.getDataOutput();
-            appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
+            appender = new FrameTupleAppender(ctx.getFrameSize());
             appender.reset(writeBuffer, true);
             ISearchOperationCallback searchCallback = opDesc.getOpCallbackProvider().getSearchOperationCallback(
                     treeIndexHelper.getResourceID());
@@ -95,7 +97,6 @@
                 frameTuple = new FrameTupleReference();
             }
         } catch (Exception e) {
-            treeIndexHelper.deinit();
             throw new HyracksDataException(e);
         }
     }
@@ -144,18 +145,14 @@
 
     @Override
     public void close() throws HyracksDataException {
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(writeBuffer, writer);
+        }
+        writer.close();
         try {
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-            }
-            writer.close();
-            try {
-                cursor.close();
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
-        } finally {
-            treeIndexHelper.deinit();
+            cursor.close();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
         }
     }
 
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index df82359..b47049e 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -32,15 +32,18 @@
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 public class TreeIndexStatsOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
-    private final TreeIndexDataflowHelper treeIndexHelper;
+    private final AbstractTreeIndexOperatorDescriptor opDesc;
     private final IHyracksTaskContext ctx;
+    private final TreeIndexDataflowHelper treeIndexHelper;
     private TreeIndexStatsGatherer statsGatherer;
 
     public TreeIndexStatsOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition) {
-        treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
-                opDesc, ctx, partition);
+        this.opDesc = opDesc;
         this.ctx = ctx;
+        this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
+                .createIndexDataflowHelper(opDesc, ctx, partition);
+
     }
 
     @Override
@@ -58,10 +61,9 @@
             writer.open();
             treeIndexHelper.init(false);
             ITreeIndex treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
-            IBufferCache bufferCache = treeIndexHelper.getOperatorDescriptor().getStorageManager().getBufferCache(ctx);
-            IFileMapProvider fileMapProvider = treeIndexHelper.getOperatorDescriptor().getStorageManager()
-                    .getFileMapProvider(ctx);
-            int indexFileId = fileMapProvider.lookupFileId(treeIndexHelper.getFilereference());
+            IBufferCache bufferCache = opDesc.getStorageManager().getBufferCache(ctx);
+            IFileMapProvider fileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
+            int indexFileId = fileMapProvider.lookupFileId(treeIndexHelper.getFileReference());
             statsGatherer = new TreeIndexStatsGatherer(bufferCache, treeIndex.getFreePageManager(), indexFileId,
                     treeIndex.getRootPageId());
             TreeIndexStats stats = statsGatherer.gatherStats(treeIndex.getLeafFrameFactory().createFrame(), treeIndex
@@ -81,11 +83,7 @@
             }
             FrameUtils.flushFrame(frame, writer);
         } catch (Exception e) {
-            try {
-                treeIndexHelper.deinit();
-            } finally {
-                writer.fail();
-            }
+            writer.fail();
         } finally {
             writer.close();
         }
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
index 3642da0..9b9ff2c 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
@@ -24,10 +24,13 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.am.invertedindex.impls.InvertedIndex;
 
 public class InvertedIndexBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+    private final AbstractInvertedIndexOperatorDescriptor opDesc;
+    private final IHyracksTaskContext ctx;
     private final InvertedIndexDataflowHelper invIndexDataflowHelper;
     private InvertedIndex invIndex;
     private IIndexBulkLoader bulkLoader;
@@ -39,6 +42,8 @@
 
     public InvertedIndexBulkLoadOperatorNodePushable(AbstractInvertedIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider) {
+        this.opDesc = opDesc;
+        this.ctx = ctx;
         this.invIndexDataflowHelper = new InvertedIndexDataflowHelper(opDesc, ctx, partition);
         this.recordDescProvider = recordDescProvider;
         tuple.setFieldPermutation(fieldPermutation);
@@ -46,25 +51,17 @@
 
     @Override
     public void open() throws HyracksDataException {
-        AbstractInvertedIndexOperatorDescriptor opDesc = (AbstractInvertedIndexOperatorDescriptor) invIndexDataflowHelper
-                .getOperatorDescriptor();
         RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
-        accessor = new FrameTupleAccessor(invIndexDataflowHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        accessor = new FrameTupleAccessor(ctx.getFrameSize(), recDesc);
 
         // Inverted Index.
-        try {
-            invIndexDataflowHelper.init(false);
-            invIndex = (InvertedIndex) invIndexDataflowHelper.getIndex();
+        invIndexDataflowHelper.init(false);
+        invIndex = (InvertedIndex) invIndexDataflowHelper.getIndex();
 
+        try {
             bulkLoader = invIndex.createBulkLoader(BTree.DEFAULT_FILL_FACTOR);
-        } catch (Exception e) {
-            // Cleanup in case of failure.
-            invIndexDataflowHelper.deinit();
-            if (e instanceof HyracksDataException) {
-                throw (HyracksDataException) e;
-            } else {
-                throw new HyracksDataException(e);
-            }
+        } catch (IndexException e) {
+            throw new HyracksDataException(e);
         }
     }
 
@@ -80,13 +77,7 @@
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            bulkLoader.end();
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        } finally {
-            invIndexDataflowHelper.deinit();
-        }
+        bulkLoader.end();
     }
 
     @Override
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexCreateOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexCreateOperatorNodePushable.java
index bd25746..ef8ef42 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexCreateOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexCreateOperatorNodePushable.java
@@ -44,12 +44,7 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        // Inverted Index.
-        try {
-            invIndexDataflowHelper.init(true);
-        } finally {
-            invIndexDataflowHelper.deinit();
-        }
+        invIndexDataflowHelper.init(true);
     }
 
     @Override
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
index 7409b2a..969e099 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
@@ -16,8 +16,6 @@
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
@@ -32,21 +30,14 @@
         super(opDesc, ctx, partition);
     }
 
-    public FileReference getFilereference() {
-        AbstractInvertedIndexOperatorDescriptor invIndexOpDesc = (AbstractInvertedIndexOperatorDescriptor) opDesc;
-        IFileSplitProvider fileSplitProvider = invIndexOpDesc.getInvListsFileSplitProvider();
-        return fileSplitProvider.getFileSplits()[partition].getLocalFile();
-    }
-
     @Override
-    public IIndex createIndexInstance() throws HyracksDataException {
+    public IIndex getIndexInstance() throws HyracksDataException {
         IInvertedIndexOperatorDescriptor invIndexOpDesc = (IInvertedIndexOperatorDescriptor) opDesc;
         IInvertedListBuilder invListBuilder = new FixedSizeElementInvertedListBuilder(
                 invIndexOpDesc.getInvListsTypeTraits());
         return new InvertedIndex(opDesc.getStorageManager().getBufferCache(ctx), opDesc.getStorageManager()
                 .getFileMapProvider(ctx), invListBuilder, invIndexOpDesc.getInvListsTypeTraits(),
                 invIndexOpDesc.getInvListsComparatorFactories(), invIndexOpDesc.getTreeIndexTypeTraits(),
-                invIndexOpDesc.getTreeIndexComparatorFactories(),
-                opDesc.getFileSplitProvider().getFileSplits()[partition].getLocalFile());
+                invIndexOpDesc.getTreeIndexComparatorFactories(), file);
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
index 4b932b1..b9dd1ea 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
@@ -38,6 +38,8 @@
 import edu.uci.ics.hyracks.storage.am.invertedindex.impls.OccurrenceThresholdPanicException;
 
 public class InvertedIndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private final AbstractInvertedIndexOperatorDescriptor opDesc;
+    private final IHyracksTaskContext ctx;
     private final InvertedIndexDataflowHelper invIndexDataflowHelper;
     private final int queryField;
     private FrameTupleAccessor accessor;
@@ -54,45 +56,34 @@
     private ArrayTupleBuilder tb;
     private DataOutput dos;
 
-    private final AbstractInvertedIndexOperatorDescriptor opDesc;
     private final boolean retainInput;
 
     public InvertedIndexSearchOperatorNodePushable(AbstractInvertedIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, int queryField, IInvertedIndexSearchModifier searchModifier,
             IRecordDescriptorProvider recordDescProvider) {
         this.opDesc = opDesc;
+        this.ctx = ctx;
         this.invIndexDataflowHelper = new InvertedIndexDataflowHelper(opDesc, ctx, partition);
         this.queryField = queryField;
         this.searchPred = new InvertedIndexSearchPredicate(opDesc.getTokenizerFactory().createTokenizer(),
                 searchModifier);
         this.recordDescProvider = recordDescProvider;
-        this.retainInput = invIndexDataflowHelper.getOperatorDescriptor().getRetainInput();
+        this.retainInput = opDesc.getRetainInput();
     }
 
     @Override
     public void open() throws HyracksDataException {
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
-        accessor = new FrameTupleAccessor(invIndexDataflowHelper.getHyracksTaskContext().getFrameSize(), inputRecDesc);
+        accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
         tuple = new FrameTupleReference();
 
-        // Inverted Index.
-        try {
-            invIndexDataflowHelper.init(false);
-            invIndex = (InvertedIndex) invIndexDataflowHelper.getIndex();
-        } catch (Exception e) {
-            // Cleanup in case of failure.
-            invIndexDataflowHelper.deinit();
-            if (e instanceof HyracksDataException) {
-                throw (HyracksDataException) e;
-            } else {
-                throw new HyracksDataException(e);
-            }
-        }
+        invIndexDataflowHelper.init(false);
+        invIndex = (InvertedIndex) invIndexDataflowHelper.getIndex();
 
-        writeBuffer = invIndexDataflowHelper.getHyracksTaskContext().allocateFrame();
+        writeBuffer = ctx.allocateFrame();
         tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
         dos = tb.getDataOutput();
-        appender = new FrameTupleAppender(invIndexDataflowHelper.getHyracksTaskContext().getFrameSize());
+        appender = new FrameTupleAppender(ctx.getFrameSize());
         appender.reset(writeBuffer, true);
 
         indexAccessor = invIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
@@ -156,13 +147,9 @@
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-            }
-            writer.close();
-        } finally {
-            invIndexDataflowHelper.deinit();
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(writeBuffer, writer);
         }
+        writer.close();
     }
 }
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
index 0582dbd..fbfd4b8 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
@@ -17,8 +17,6 @@
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -66,15 +64,10 @@
     }
 
     @Override
-    public ITreeIndex createIndexInstance() throws HyracksDataException {
+    public ITreeIndex getIndexInstance() throws HyracksDataException {
         ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
         InMemoryBufferCache memBufferCache = new InMemoryBufferCache(new HeapBufferAllocator(), memPageSize,
                 memNumPages, new TransientFileMapManager());
-        IFileSplitProvider fileSplitProvider = opDesc.getFileSplitProvider();
-        FileReference file = fileSplitProvider.getFileSplits()[partition].getLocalFile();
-        if (file.getFile().exists() && !file.getFile().isDirectory()) {
-            file.delete();
-        }
         InMemoryFreePageManager memFreePageManager = new InMemoryFreePageManager(memNumPages, metaDataFrameFactory);
         return LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ctx.getIOManager(), file, opDesc
                 .getStorageManager().getBufferCache(ctx), opDesc.getStorageManager().getFileMapProvider(ctx),
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
index d311e56..b021300 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
@@ -21,7 +21,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
@@ -83,15 +82,10 @@
     }
 
     @Override
-    public ITreeIndex createIndexInstance() throws HyracksDataException {
+    public ITreeIndex getIndexInstance() throws HyracksDataException {
         ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
         InMemoryBufferCache memBufferCache = new LSMRTreeInMemoryBufferCache(new HeapBufferAllocator(), memPageSize,
                 memNumPages);
-        IFileSplitProvider fileSplitProvider = opDesc.getFileSplitProvider();
-        FileReference file = fileSplitProvider.getFileSplits()[partition].getLocalFile();
-        if (file.getFile().exists() && !file.getFile().isDirectory()) {
-            file.delete();
-        }
         InMemoryFreePageManager memFreePageManager = new LSMRTreeInMemoryFreePageManager(memNumPages,
                 metaDataFrameFactory);
 
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
index c3660d1..f513460 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
@@ -37,10 +37,9 @@
     }
 
     @Override
-    public ITreeIndex createIndexInstance() throws HyracksDataException {
+    public ITreeIndex getIndexInstance() throws HyracksDataException {
         return RTreeUtils.createRTree(treeOpDesc.getStorageManager().getBufferCache(ctx), treeOpDesc
                 .getStorageManager().getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(),
-                valueProviderFactories, treeOpDesc.getTreeIndexComparatorFactories(), rtreePolicyType, treeOpDesc
-                        .getFileSplitProvider().getFileSplits()[partition].getLocalFile());
+                valueProviderFactories, treeOpDesc.getTreeIndexComparatorFactories(), rtreePolicyType, file);
     }
 }