More index operator cleanup for better code sharing with the lsm inverted index.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1894 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 2f0679c..dc5d161 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -19,12 +19,13 @@
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexSearchOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 
-public class BTreeSearchOperatorNodePushable extends TreeIndexSearchOperatorNodePushable {
+public class BTreeSearchOperatorNodePushable extends IndexSearchOperatorNodePushable {
     protected final boolean lowKeyInclusive;
     protected final boolean highKeyInclusive;
 
@@ -61,6 +62,7 @@
 
     @Override
     protected ISearchPredicate createSearchPredicate() {
+        ITreeIndex treeIndex = (ITreeIndex) index;
         lowKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), lowKey);
         highKeySearchCmp = BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), highKey);
         return new RangePredicate(lowKey, highKey, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp, highKeySearchCmp);
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
index 22e95b2..648e523 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeUpdateSearchOperatorNodePushable.java
@@ -21,7 +21,9 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.ITupleUpdater;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 
@@ -38,6 +40,8 @@
 
     @Override
     protected ITreeIndexCursor createCursor() {
+        ITreeIndex treeIndex = (ITreeIndex) index;
+        ITreeIndexFrame cursorFrame = treeIndex.getLeafFrameFactory().createFrame();
         return new BTreeRangeSearchCursor((IBTreeLeafFrame) cursorFrame, true);
     }
 
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
similarity index 87%
rename from hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
rename to hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 57880cd..2ccfdc7 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -24,33 +24,32 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
 import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 
-public class TreeIndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-    private final AbstractTreeIndexOperatorDescriptor opDesc;
+public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private final IIndexOperatorDescriptor opDesc;
     private final IHyracksTaskContext ctx;
-    private final IIndexDataflowHelper indexHelper;    
+    private final IIndexDataflowHelper indexHelper;
     private final IRecordDescriptorProvider recordDescProvider;
     private final IndexOp op;
     private final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
     private FrameTupleAccessor accessor;
-    private FrameTupleReference frameTuple;    
+    private FrameTupleReference frameTuple;
     private ByteBuffer writeBuffer;
     private IIndexAccessor indexAccessor;
     private ITupleFilter tupleFilter;
     private IModificationOperationCallback modCallback;
 
-    public TreeIndexInsertUpdateDeleteOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
-            IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
-            IRecordDescriptorProvider recordDescProvider, IndexOp op) {
+    public IndexInsertUpdateDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOp op) {
         this.opDesc = opDesc;
         this.ctx = ctx;
         this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
@@ -66,10 +65,10 @@
         writeBuffer = ctx.allocateFrame();
         writer.open();
         indexHelper.open();
-        ITreeIndex treeIndex = (ITreeIndex) indexHelper.getIndexInstance();
+        IIndex index = indexHelper.getIndexInstance();
         try {
             modCallback = opDesc.getOpCallbackProvider().getModificationOperationCallback(indexHelper.getResourceID());
-            indexAccessor = treeIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
+            indexAccessor = index.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
             ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
             if (tupleFilterFactory != null) {
                 tupleFilter = tupleFilterFactory.createTupleFilter(indexHelper.getTaskContext());
@@ -135,7 +134,6 @@
         } finally {
             indexHelper.close();
         }
-
     }
 
     @Override
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
similarity index 88%
rename from hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
rename to hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 31fbe18..390cbbc 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -28,17 +28,16 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 
-public abstract class TreeIndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-    protected final AbstractTreeIndexOperatorDescriptor opDesc;
+public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    protected final IIndexOperatorDescriptor opDesc;
     protected final IHyracksTaskContext ctx;
     protected final IIndexDataflowHelper indexHelper;
     protected FrameTupleAccessor accessor;
@@ -48,18 +47,17 @@
     protected ArrayTupleBuilder tb;
     protected DataOutput dos;
 
-    protected ITreeIndex treeIndex;
+    protected IIndex index;
     protected ISearchPredicate searchPred;
     protected IIndexCursor cursor;
-    protected ITreeIndexFrame cursorFrame;
     protected IIndexAccessor indexAccessor;
 
     protected final RecordDescriptor inputRecDesc;
     protected final boolean retainInput;
     protected FrameTupleReference frameTuple;
 
-    public TreeIndexSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, IRecordDescriptorProvider recordDescProvider) {
+    public IndexSearchOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+            IRecordDescriptorProvider recordDescProvider) {
         this.opDesc = opDesc;
         this.ctx = ctx;
         this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
@@ -80,9 +78,8 @@
         accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
         writer.open();
         indexHelper.open();
-        treeIndex = (ITreeIndex) indexHelper.getIndexInstance();
+        index = indexHelper.getIndexInstance();
         try {
-            cursorFrame = treeIndex.getLeafFrameFactory().createFrame();
             searchPred = createSearchPredicate();
             writeBuffer = ctx.allocateFrame();
             tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
@@ -91,7 +88,7 @@
             appender.reset(writeBuffer, true);
             ISearchOperationCallback searchCallback = opDesc.getOpCallbackProvider().getSearchOperationCallback(
                     indexHelper.getResourceID());
-            indexAccessor = treeIndex.createAccessor(NoOpOperationCallback.INSTANCE, searchCallback);
+            indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, searchCallback);
             cursor = createCursor();
             if (retainInput) {
                 frameTuple = new FrameTupleReference();
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
index 58980d2..db33cee 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
@@ -34,8 +34,7 @@
     private static final long serialVersionUID = 1L;
 
     private final int[] fieldPermutation;
-
-    private IndexOp op;
+    private final IndexOp op;
 
     public TreeIndexInsertUpdateDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
             IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
@@ -52,7 +51,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new TreeIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
+        return new IndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
                 recordDescProvider, op);
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearcher.java
index 1d3b4ef..5dc0696 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearcher.java
@@ -32,7 +32,7 @@
 
     public IFrameTupleAccessor createResultFrameTupleAccessor();
 
-    public ITupleReference createResultTupleReference();
+    public ITupleReference createResultFrameTupleReference();
 
     public List<ByteBuffer> getResultBuffers();
 
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
new file mode 100644
index 0000000..3bde5be
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexInsertUpdateDeleteOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class LSMInvertedIndexInsertUpdateDeleteOperator extends AbstractLSMInvertedIndexOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] fieldPermutation;
+    private final IndexOp op;
+
+    public LSMInvertedIndexInsertUpdateDeleteOperator(IOperatorDescriptorRegistry spec,
+            IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
+            IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
+            IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
+            int[] fieldPermutation, IndexOp op, IIndexDataflowHelperFactory dataflowHelperFactory,
+            IOperationCallbackProvider opCallbackProvider) {
+        super(spec, 0, 0, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+                tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
+                dataflowHelperFactory, null, false, opCallbackProvider);
+        this.fieldPermutation = fieldPermutation;
+        this.op = op;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new IndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
+                recordDescProvider, op);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
index 2752426..a997575 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
@@ -57,7 +57,7 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         IInvertedIndexSearchModifier searchModifier = searchModifierFactory.createSearchModifier();
-        return new LSMInvertedIndexSearchOperatorNodePushable(this, ctx, partition, queryField, searchModifier,
-                recordDescProvider);
+        return new LSMInvertedIndexSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, queryField,
+                searchModifier);
     }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
index 80acff5..d825c02 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
@@ -15,151 +15,46 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
 
-import java.io.DataOutput;
-import java.nio.ByteBuffer;
-
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate;
 
-public class LSMInvertedIndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-    private final AbstractLSMInvertedIndexOperatorDescriptor opDesc;
-    private final IHyracksTaskContext ctx;
-    private final IIndexDataflowHelper invIndexDataflowHelper;
-    private final int queryField;
-    private FrameTupleAccessor accessor;
-    private FrameTupleReference tuple;
-    private IRecordDescriptorProvider recordDescProvider;
-    private IIndex invIndex;
+public class LSMInvertedIndexSearchOperatorNodePushable extends IndexSearchOperatorNodePushable {
 
-    private final InvertedIndexSearchPredicate searchPred;
-    private IIndexAccessor indexAccessor;
-    private IIndexCursor resultCursor;
+    protected final IInvertedIndexSearchModifier searchModifier;
+    protected final int queryFieldIndex;
+    protected final int invListFields;
 
-    private ByteBuffer writeBuffer;
-    private FrameTupleAppender appender;
-    private ArrayTupleBuilder tb;
-    private DataOutput dos;
-
-    private final boolean retainInput;
-
-    public LSMInvertedIndexSearchOperatorNodePushable(AbstractLSMInvertedIndexOperatorDescriptor opDesc,
-            IHyracksTaskContext ctx, int partition, int queryField, IInvertedIndexSearchModifier searchModifier,
-            IRecordDescriptorProvider recordDescProvider) {
-        this.opDesc = opDesc;
-        this.ctx = ctx;
-        this.invIndexDataflowHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx,
-                partition);
-        this.queryField = queryField;
-        this.searchPred = new InvertedIndexSearchPredicate(opDesc.getTokenizerFactory().createTokenizer(),
-                searchModifier);
-        this.recordDescProvider = recordDescProvider;
-        this.retainInput = opDesc.getRetainInput();
+    public LSMInvertedIndexSearchOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, IRecordDescriptorProvider recordDescProvider, int queryFieldIndex,
+            IInvertedIndexSearchModifier searchModifier) {
+        super(opDesc, ctx, partition, recordDescProvider);
+        this.searchModifier = searchModifier;
+        this.queryFieldIndex = queryFieldIndex;
+        // If retainInput is true, the frameTuple is created in IndexSearchOperatorNodePushable.open().
+        if (!opDesc.getRetainInput()) {
+            this.frameTuple = new FrameTupleReference();
+        }
+        AbstractLSMInvertedIndexOperatorDescriptor invIndexOpDesc = (AbstractLSMInvertedIndexOperatorDescriptor) opDesc;
+        invListFields = invIndexOpDesc.getInvListsTypeTraits().length;
     }
 
     @Override
-    public void open() throws HyracksDataException {
-        RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
-        accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
-        tuple = new FrameTupleReference();
-
-        invIndexDataflowHelper.open();
-        invIndex = invIndexDataflowHelper.getIndexInstance();
-        try {
-            writeBuffer = ctx.allocateFrame();
-            tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
-            dos = tb.getDataOutput();
-            appender = new FrameTupleAppender(ctx.getFrameSize());
-            appender.reset(writeBuffer, true);
-
-            indexAccessor = invIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-            resultCursor = indexAccessor.createSearchCursor();
-            writer.open();
-        } catch (HyracksDataException e) {
-            invIndexDataflowHelper.close();
-            throw e;
-        }
-    }
-
-    private void writeSearchResults() throws Exception {
-        while (resultCursor.hasNext()) {
-            resultCursor.next();
-            tb.reset();
-            if (retainInput) {
-                for (int i = 0; i < tuple.getFieldCount(); i++) {
-                    dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
-                    tb.addFieldEndOffset();
-                }
-            }
-            ITupleReference invListElement = resultCursor.getTuple();
-            int invListFields = opDesc.getInvListsTypeTraits().length;
-            for (int i = 0; i < invListFields; i++) {
-                dos.write(invListElement.getFieldData(i), invListElement.getFieldStart(i),
-                        invListElement.getFieldLength(i));
-                tb.addFieldEndOffset();
-            }
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-                appender.reset(writeBuffer, true);
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    throw new IllegalStateException();
-                }
-            }
-        }
+    protected ISearchPredicate createSearchPredicate() {
+        AbstractLSMInvertedIndexOperatorDescriptor invIndexOpDesc = (AbstractLSMInvertedIndexOperatorDescriptor) opDesc;
+        return new InvertedIndexSearchPredicate(invIndexOpDesc.getTokenizerFactory().createTokenizer(), searchModifier);
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        accessor.reset(buffer);
-        int tupleCount = accessor.getTupleCount();
-        try {
-            for (int i = 0; i < tupleCount; i++) {
-                tuple.reset(accessor, i);
-                searchPred.setQueryTuple(tuple);
-                searchPred.setQueryFieldIndex(queryField);
-                try {
-                    resultCursor.reset();
-                    indexAccessor.search(resultCursor, searchPred);
-                    writeSearchResults();
-                } catch (OccurrenceThresholdPanicException e) {
-                    // Ignore panic cases for now.
-                }
-            }
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        writer.fail();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-            }
-            writer.close();
-        } finally {
-            invIndexDataflowHelper.close();
-        }
+    protected void resetSearchPredicate(int tupleIndex) {
+        frameTuple.reset(accessor, tupleIndex);
+        InvertedIndexSearchPredicate invIndexSearchPred = (InvertedIndexSearchPredicate) searchPred;
+        invIndexSearchPred.setQueryTuple(frameTuple);
+        invIndexSearchPred.setQueryFieldIndex(queryFieldIndex);
     }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
index 7d16cb7..0e1a033 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
@@ -47,7 +47,6 @@
         this.opCtx = opCtx;
         this.index = index;
         this.searcher = new TOccurrenceSearcher(hyracksCtx, index);
-        // TODO: Ignore opcallbacks for now.
         this.btreeAccessor = (BTreeAccessor) index.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
     }
@@ -66,7 +65,7 @@
     
     @Override
     public IIndexCursor createSearchCursor() {
-        return new OnDiskInvertedIndexSearchCursor(searcher);
+        return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 8e15526..2fffe62 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -459,7 +459,7 @@
 
         @Override
         public IIndexCursor createSearchCursor() {
-            return new OnDiskInvertedIndexSearchCursor(searcher);
+            return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
         }
 
         @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java
index 9f9b588..3060ef4 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexSearchCursor.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingTupleReference;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
 
 public class OnDiskInvertedIndexSearchCursor implements IIndexCursor {
@@ -34,12 +35,19 @@
     private int tupleIndex = 0;
     private final IInvertedIndexSearcher invIndexSearcher;
     private final IFrameTupleAccessor fta;
-    private final FixedSizeTupleReference resultTuple;
+    private final FixedSizeTupleReference frameTuple;
+    private final PermutingTupleReference resultTuple;
     
-    public OnDiskInvertedIndexSearchCursor(IInvertedIndexSearcher invIndexSearcher) {
+    public OnDiskInvertedIndexSearchCursor(IInvertedIndexSearcher invIndexSearcher, int numInvListFields) {
         this.invIndexSearcher = invIndexSearcher;
         this.fta = invIndexSearcher.createResultFrameTupleAccessor();
-        this.resultTuple = (FixedSizeTupleReference) invIndexSearcher.createResultTupleReference();
+        this.frameTuple = (FixedSizeTupleReference) invIndexSearcher.createResultFrameTupleReference();
+        // Project away the occurrence count from the result tuples.
+        int[] fieldPermutation = new int[numInvListFields];
+        for (int i = 0; i < numInvListFields; i++) {
+            fieldPermutation[i] = i;
+        }
+        resultTuple = new PermutingTupleReference(fieldPermutation);
     }
 
     @Override
@@ -64,7 +72,8 @@
 
     @Override
     public void next() {
-        resultTuple.reset(fta.getBuffer().array(), fta.getTupleStartOffset(tupleIndex));
+        frameTuple.reset(fta.getBuffer().array(), fta.getTupleStartOffset(tupleIndex));
+        resultTuple.reset(frameTuple);
         tupleIndex++;
         if (tupleIndex >= fta.getTupleCount()) {
             if (currentBufferIndex + 1 < numResultBuffers) {
@@ -72,7 +81,7 @@
                 fta.reset(resultBuffers.get(currentBufferIndex));
                 tupleIndex = 0;
             }
-        }
+        }        
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
index 2caa740..6d2fc73 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
@@ -499,7 +499,7 @@
         return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
     }
 
-    public ITupleReference createResultTupleReference() {
+    public ITupleReference createResultFrameTupleReference() {
         return new FixedSizeTupleReference(invListFieldsWithCount);
     }
 
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index 6ac7964..dd84e92 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -19,14 +19,15 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexSearchOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
 import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
 
-public class RTreeSearchOperatorNodePushable extends TreeIndexSearchOperatorNodePushable {
+public class RTreeSearchOperatorNodePushable extends IndexSearchOperatorNodePushable {
     protected PermutingFrameTupleReference searchKey;
     protected MultiComparator cmp;
 
@@ -42,6 +43,7 @@
 
     @Override
     protected ISearchPredicate createSearchPredicate() {
+        ITreeIndex treeIndex = (ITreeIndex) index;
         cmp = RTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), searchKey);
         return new SearchPredicate(searchKey, cmp);
     }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMTreeFileManagerTest.java b/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMIndexFileManagerTest.java
similarity index 99%
rename from hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMTreeFileManagerTest.java
rename to hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMIndexFileManagerTest.java
index 26a0037..85677e4 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMTreeFileManagerTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMIndexFileManagerTest.java
@@ -44,7 +44,7 @@
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
 
-public class LSMTreeFileManagerTest {
+public class LSMIndexFileManagerTest {
     private static final int DEFAULT_PAGE_SIZE = 256;
     private static final int DEFAULT_NUM_PAGES = 100;
     private static final int DEFAULT_MAX_OPEN_FILES = 10;