moved opcallbacks out of dataflowhelpers and did some minor cleanup

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1705 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 9c295d7..3d29b06 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -15,67 +15,54 @@
 
 package edu.uci.ics.hyracks.storage.am.common.dataflow;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.common.file.IIndexArtifactMap;
 
 public abstract class IndexDataflowHelper {
-    protected IIndex index;
-    protected int indexFileId = -1;
-
     protected final int partition;
     protected final IIndexOperatorDescriptor opDesc;
     protected final IHyracksTaskContext ctx;
-    protected transient IModificationOperationCallback modificationOperationCallback;
-    protected transient ISearchOperationCallback searchOperationCallback;
+    protected final IIndexArtifactMap indexArtifactMap;
+
+    protected IIndex index;
+    protected long resourceID = -1;
 
     public IndexDataflowHelper(IIndexOperatorDescriptor opDesc, final IHyracksTaskContext ctx, int partition) {
         this.opDesc = opDesc;
         this.ctx = ctx;
         this.partition = partition;
+        this.indexArtifactMap = opDesc.getStorageManager().getIndexArtifactMap(ctx);
     }
 
     public void init(boolean forceCreate) throws HyracksDataException {
         IndexRegistry<IIndex> indexRegistry = opDesc.getIndexRegistryProvider().getRegistry(ctx);
-        IIndexArtifactMap indexArtifactMap = opDesc.getStorageManager().getIndexArtifactMap(ctx);
-        FileReference fileRef = getFilereference();
-
-        // check whether the requested index instance already exists by
-        // retrieving indexRegistry with resourceId
-        // To do so, checking the index directory in the first ioDevice is
-        // sufficient.
-
-        // create a fullDir(= IODeviceDir + baseDir) for the requested index
-        IIOManager ioManager = ctx.getIOManager();
-        List<IODeviceHandle> ioDeviceHandles = ioManager.getIODevices();
+        FileReference file = getFilereference();
+        List<IODeviceHandle> ioDeviceHandles = ctx.getIOManager().getIODevices();
         String fullDir = ioDeviceHandles.get(0).getPath().toString();
-        if (!fullDir.endsWith(System.getProperty("file.separator"))) {
-            fullDir += System.getProperty("file.separator");
+        if (!fullDir.endsWith(File.separator)) {
+            fullDir += File.separator;
         }
-        String baseDir = fileRef.getFile().getPath();
-        if (!baseDir.endsWith(System.getProperty("file.separator"))) {
-            baseDir += System.getProperty("file.separator");
+        String baseDir = file.getFile().getPath();
+        if (!baseDir.endsWith(File.separator)) {
+            baseDir += File.separator;
         }
         fullDir += baseDir;
 
-        // get the corresponding resourceId with the fullDir
-        long resourceId = indexArtifactMap.get(fullDir);
+        resourceID = indexArtifactMap.get(fullDir);
 
         // Create new index instance and register it.
         synchronized (indexRegistry) {
             // Check if the index has already been registered.
             boolean register = false;
-            index = indexRegistry.get(resourceId);
+            index = indexRegistry.get(resourceID);
             if (index == null) {
                 index = createIndexInstance();
                 register = true;
@@ -84,20 +71,16 @@
                 index.create();
                 // Create new resourceId
                 try {
-                    resourceId = indexArtifactMap.create(baseDir, ioDeviceHandles);
+                    resourceID = indexArtifactMap.create(baseDir, ioDeviceHandles);
                 } catch (IOException e) {
                     throw new HyracksDataException(e);
                 }
             }
             if (register) {
                 index.activate();
-                indexRegistry.register(resourceId, index);
+                indexRegistry.register(resourceID, index);
             }
         }
-
-        //set operationCallback object
-        modificationOperationCallback = opDesc.getOpCallbackProvider().getModificationOperationCallback(resourceId);
-        searchOperationCallback = opDesc.getOpCallbackProvider().getSearchOperationCallback(resourceId);
     }
 
     public abstract IIndex createIndexInstance() throws HyracksDataException;
@@ -122,20 +105,7 @@
         return opDesc;
     }
 
-    public int getIndexFileId() {
-        return indexFileId;
+    public long getResourceID() {
+        return resourceID;
     }
-
-    public IOperationCallbackProvider getOpCallbackProvider() {
-        return opDesc.getOpCallbackProvider();
-    }
-
-    public IModificationOperationCallback getModificationOperationCallback() {
-        return modificationOperationCallback;
-    }
-
-    public ISearchOperationCallback getSearchOperationCallback() {
-        return searchOperationCallback;
-    }
-
 }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 33796e6..3c67fcd 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -24,19 +24,23 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 
 public class TreeIndexDiskOrderScanOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+    private final AbstractTreeIndexOperatorDescriptor opDesc;
     private final TreeIndexDataflowHelper treeIndexHelper;
     private ITreeIndex treeIndex;
 
     public TreeIndexDiskOrderScanOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition) {
-        treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
-                opDesc, ctx, partition);
+        this.opDesc = opDesc;
+        this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
+                .createIndexDataflowHelper(opDesc, ctx, partition);
     }
 
     @Override
@@ -46,8 +50,10 @@
             treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
             ITreeIndexFrame cursorFrame = treeIndex.getLeafFrameFactory().createFrame();
             ITreeIndexCursor cursor = treeIndexHelper.createDiskOrderScanCursor(cursorFrame);
+            ISearchOperationCallback searchCallback = opDesc.getOpCallbackProvider().getSearchOperationCallback(
+                    treeIndexHelper.getResourceID());
             ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor(
-                    treeIndexHelper.getModificationOperationCallback(), treeIndexHelper.getSearchOperationCallback());
+                    NoOpOperationCallback.INSTANCE, searchCallback);
             writer.open();
             try {
                 indexAccessor.diskOrderScan(cursor);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index 017cb75..d7a4b99 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
 import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 
 public class TreeIndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -41,12 +42,13 @@
     private ByteBuffer writeBuffer;
     private IIndexAccessor indexAccessor;
     private ITupleFilter tupleFilter;
+    private IModificationOperationCallback modCallback;
 
     public TreeIndexInsertUpdateDeleteOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
             IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
             IRecordDescriptorProvider recordDescProvider, IndexOp op) {
-        treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
-                opDesc, ctx, partition);
+        this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
+                .createIndexDataflowHelper(opDesc, ctx, partition);
         this.recordDescProvider = recordDescProvider;
         this.op = op;
         tuple.setFieldPermutation(fieldPermutation);
@@ -63,8 +65,9 @@
         try {
             treeIndexHelper.init(false);
             ITreeIndex treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
-            indexAccessor = treeIndex.createAccessor(treeIndexHelper.getModificationOperationCallback(),
-                    treeIndexHelper.getSearchOperationCallback());
+            modCallback = opDesc.getOpCallbackProvider().getModificationOperationCallback(
+                    treeIndexHelper.getResourceID());
+            indexAccessor = treeIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
             ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
             if (tupleFilterFactory != null) {
                 tupleFilter = tupleFilterFactory.createTupleFilter(treeIndexHelper.ctx);
@@ -90,9 +93,8 @@
                     }
                 }
                 tuple.reset(accessor, i);
-                IModificationOperationCallback modificationCallback = treeIndexHelper
-                        .getModificationOperationCallback();
-                modificationCallback.before(tuple);
+
+                modCallback.before(tuple);
                 switch (op) {
                     case INSERT: {
                         indexAccessor.insert(tuple);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
index 7e23d29..a49e9b3 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
@@ -30,11 +30,14 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 
 public abstract class TreeIndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    protected final AbstractTreeIndexOperatorDescriptor opDesc;
     protected final TreeIndexDataflowHelper treeIndexHelper;
     protected FrameTupleAccessor accessor;
 
@@ -55,10 +58,11 @@
 
     public TreeIndexSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, IRecordDescriptorProvider recordDescProvider) {
-        treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
-                opDesc, ctx, partition);
+        this.opDesc = opDesc;
+        this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
+                .createIndexDataflowHelper(opDesc, ctx, partition);
         this.retainInput = treeIndexHelper.getOperatorDescriptor().getRetainInput();
-        inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        this.inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
     }
 
     protected abstract ISearchPredicate createSearchPredicate();
@@ -73,7 +77,7 @@
     public void open() throws HyracksDataException {
         accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), inputRecDesc);
         writer.open();
-        try {        	
+        try {
             treeIndexHelper.init(false);
             treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
             cursorFrame = treeIndex.getLeafFrameFactory().createFrame();
@@ -83,11 +87,12 @@
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
-            indexAccessor = treeIndex.createAccessor(treeIndexHelper.getModificationOperationCallback(),
-                    treeIndexHelper.getSearchOperationCallback());
+            ISearchOperationCallback searchCallback = opDesc.getOpCallbackProvider().getSearchOperationCallback(
+                    treeIndexHelper.getResourceID());
+            indexAccessor = treeIndex.createAccessor(NoOpOperationCallback.INSTANCE, searchCallback);
             cursor = createCursor();
             if (retainInput) {
-            	frameTuple = new FrameTupleReference();
+                frameTuple = new FrameTupleReference();
             }
         } catch (Exception e) {
             treeIndexHelper.deinit();
@@ -100,9 +105,9 @@
             tb.reset();
             cursor.next();
             if (retainInput) {
-            	frameTuple.reset(accessor, tupleIndex);
+                frameTuple.reset(accessor, tupleIndex);
                 for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-                	dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+                    dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
                     tb.addFieldEndOffset();
                 }
             }