Modified RTree search operator to extend the common TreeIndexSearchOperator.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1195 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index fd65c20..2de7c79 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -15,153 +15,39 @@
 
 package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
 
-import java.io.DataOutput;
-import java.nio.ByteBuffer;
-
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexSearchOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
-import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
-import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
 import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
 
-public class RTreeSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-    private TreeIndexDataflowHelper treeIndexHelper;
-    private FrameTupleAccessor accessor;
-
-    private ByteBuffer writeBuffer;
-    private FrameTupleAppender appender;
-    private ArrayTupleBuilder tb;
-    private DataOutput dos;
-
-    private RTree rtree;
-    private PermutingFrameTupleReference searchKey;
-    private SearchPredicate searchPred;
-    private MultiComparator cmp;
-    private ITreeIndexCursor cursor;
-    private ITreeIndexFrame interiorFrame;
-    private ITreeIndexFrame leafFrame;
-    private ITreeIndexAccessor indexAccessor;
-
-    private RecordDescriptor recDesc;
+public class RTreeSearchOperatorNodePushable extends TreeIndexSearchOperatorNodePushable {
+    protected PermutingFrameTupleReference searchKey;
+    protected MultiComparator cmp;
 
     public RTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, IRecordDescriptorProvider recordDescProvider, int[] keyFields) {
-        treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
-                opDesc, ctx, partition, false);
-        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+        super(opDesc, ctx, partition, recordDescProvider);
         if (keyFields != null && keyFields.length > 0) {
             searchKey = new PermutingFrameTupleReference();
             searchKey.setFieldPermutation(keyFields);
         }
     }
 
-    @Override
-    public void open() throws HyracksDataException {
-        accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);        
-        try {
-            treeIndexHelper.init();
-            writer.open();
-            try {
-                rtree = (RTree) treeIndexHelper.getIndex();
-                interiorFrame = rtree.getInteriorFrameFactory().createFrame();
-                leafFrame = rtree.getLeafFrameFactory().createFrame();
-                cursor = new RTreeSearchCursor((IRTreeInteriorFrame) interiorFrame, (IRTreeLeafFrame) leafFrame);
-                cmp = RTreeUtils.getSearchMultiComparator(rtree.getComparatorFactories(), searchKey);
+	@Override
+	protected ISearchPredicate createSearchPredicate() {
+	    RTree rtree = (RTree) treeIndex;
+	    cmp = RTreeUtils.getSearchMultiComparator(rtree.getComparatorFactories(), searchKey);
+	    return new SearchPredicate(searchKey, cmp);
+	}
 
-                searchPred = new SearchPredicate(searchKey, cmp);
-                writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
-                tb = new ArrayTupleBuilder(rtree.getFieldCount());
-                dos = tb.getDataOutput();
-                appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
-                appender.reset(writeBuffer, true);
-                indexAccessor = rtree.createAccessor();
-            } catch (Exception e) {
-                writer.fail();
-                throw e;
-            }
-
-        } catch (Exception e) {
-            treeIndexHelper.deinit();
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private void writeSearchResults() throws Exception {
-        while (cursor.hasNext()) {
-            tb.reset();
-            cursor.next();
-
-            ITupleReference frameTuple = cursor.getTuple();
-            for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-                dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-                tb.addFieldEndOffset();
-            }
-
-            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-                appender.reset(writeBuffer, true);
-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                    throw new IllegalStateException();
-                }
-            }
-        }
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        accessor.reset(buffer);
-
-        int tupleCount = accessor.getTupleCount();
-        try {
-            for (int i = 0; i < tupleCount; i++) {
-                searchKey.reset(accessor, i);
-
-                cursor.reset();
-                indexAccessor.search(cursor, searchPred);
-                writeSearchResults();
-            }
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            if (appender.getTupleCount() > 0) {
-                FrameUtils.flushFrame(writeBuffer, writer);
-            }
-            writer.close();
-            try {
-                cursor.close();
-            } catch (Exception e) {
-                throw new HyracksDataException(e);
-            }
-        } finally {
-            treeIndexHelper.deinit();
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        writer.fail();
-    }
+	@Override
+	protected void resetSearchPredicate(int tupleIndex) {
+	    searchKey.reset(accessor, tupleIndex);
+	}
 }
\ No newline at end of file